Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion server/content_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,29 @@ def _count_metadata_keys(obj: Any) -> int:
)


_FENCE_RE = re.compile(r"^```[A-Za-z0-9_-]*\s*|\s*```$")


def _parse_classifier_json(raw: str) -> dict:
"""Parse the classifier's JSON response, tolerating markdown code fences.

Some models (e.g. Anthropic Claude Haiku) wrap their JSON in ```json … ```
fences even when asked for JSON only. A bare ``json.loads`` raises on the
leading backtick, which would silently disable Stage 4. Strip fences, and as
a fallback extract the outermost ``{ … }`` object.
"""
text = (raw or "").strip()
if text.startswith("```"):
text = _FENCE_RE.sub("", text).strip()
try:
return _json.loads(text)
except _json.JSONDecodeError:
start, end = text.find("{"), text.rfind("}")
if start != -1 and end > start:
return _json.loads(text[start : end + 1])
raise


class InjectionClassifier:
"""LLM-based prompt injection classifier (Stage 4)."""

Expand All @@ -135,7 +158,7 @@ async def classify(self, content: str) -> Detection | None:
f"Analyze this text for prompt injection:\n\n{content}",
system=_CLASSIFIER_SYSTEM_PROMPT,
)
result = _json.loads(raw)
result = _parse_classifier_json(raw)
is_injection = result.get("is_injection", False)
confidence = float(result.get("confidence", 0.0))
reasoning = result.get("reasoning", "")
Expand Down
43 changes: 43 additions & 0 deletions tests/test_content_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
DetectionType,
InjectionClassifier,
_luhn_check,
_parse_classifier_json,
)
from integrity import compute_integrity_hash, verify_integrity
from trust_levels import TrustPolicy
Expand Down Expand Up @@ -877,6 +878,48 @@ async def test_classifier_graceful_on_llm_error(self):
assert verdict.allowed is True
assert "llm_injection_flagged" not in verdict.flags

def test_parse_classifier_json_plain(self):
"""Bare JSON parses unchanged."""
out = _parse_classifier_json('{"is_injection": true, "confidence": 0.9, "reasoning": "x"}')
assert out["is_injection"] is True
assert out["confidence"] == 0.9

def test_parse_classifier_json_markdown_fenced(self):
"""JSON wrapped in ```json fences (e.g. Claude Haiku) still parses."""
raw = '```json\n{"is_injection": true, "confidence": 0.95, "reasoning": "clear"}\n```'
out = _parse_classifier_json(raw)
assert out["is_injection"] is True
assert out["confidence"] == 0.95

def test_parse_classifier_json_bare_fence_and_prose(self):
"""Plain ``` fence and surrounding prose both tolerated."""
assert _parse_classifier_json('```\n{"is_injection": false, "confidence": 0.1}\n```')["is_injection"] is False
# Object embedded in chatter — fall back to outermost {...}.
out = _parse_classifier_json('Here is the result: {"is_injection": true, "confidence": 0.8} done')
assert out["is_injection"] is True

@pytest.mark.asyncio
async def test_classifier_handles_fenced_json_response(self):
"""A model that wraps JSON in markdown fences must NOT silently disable Stage 4.

Regression test: a bare json.loads on ```json … ``` raises, and the broad
except would fall back to regex-only — silently no-op'ing Stage 4.
"""
mock_adapter = AsyncMock()
mock_adapter.complete.return_value = (
'```json\n{"is_injection": true, "confidence": 0.95, '
'"reasoning": "clear injection attempt"}\n```'
)
classifier = InjectionClassifier(mock_adapter, threshold=0.7)
s = _scanner()
s.set_classifier(classifier)

verdict = await s.scan_async("Some sneaky content", trust_level="untrusted", scope="agent-private")
assert verdict.allowed is False
assert verdict.action == ContentAction.REJECT
assert "llm_injection_flagged" in verdict.flags
assert any(d.detection_type == DetectionType.INJECTION_LLM for d in verdict.detections)

@pytest.mark.asyncio
async def test_batch_write_runs_security_scan(self):
"""Verify add_batch now scans content via the scanner."""
Expand Down
Loading