From 9e020efa2117edb6936f728643156036336bb7dd Mon Sep 17 00:00:00 2001 From: arulnidhii Date: Sun, 31 May 2026 18:20:53 +0100 Subject: [PATCH] fix(content-security): handle markdown-fenced JSON from Anthropic adapter in Stage 4 classifier --- server/content_security.py | 25 +++++++++++++++++++- tests/test_content_security.py | 43 ++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/server/content_security.py b/server/content_security.py index 4254c4b..c61f309 100644 --- a/server/content_security.py +++ b/server/content_security.py @@ -121,6 +121,29 @@ def _count_metadata_keys(obj: Any) -> int: ) +_FENCE_RE = re.compile(r"^```[A-Za-z0-9_-]*\s*|\s*```$") + + +def _parse_classifier_json(raw: str) -> dict: + """Parse the classifier's JSON response, tolerating markdown code fences. + + Some models (e.g. Anthropic Claude Haiku) wrap their JSON in ```json … ``` + fences even when asked for JSON only. A bare ``json.loads`` raises on the + leading backtick, which would silently disable Stage 4. Strip fences, and as + a fallback extract the outermost ``{ … }`` object. + """ + text = (raw or "").strip() + if text.startswith("```"): + text = _FENCE_RE.sub("", text).strip() + try: + return _json.loads(text) + except _json.JSONDecodeError: + start, end = text.find("{"), text.rfind("}") + if start != -1 and end > start: + return _json.loads(text[start : end + 1]) + raise + + class InjectionClassifier: """LLM-based prompt injection classifier (Stage 4).""" @@ -135,7 +158,7 @@ async def classify(self, content: str) -> Detection | None: f"Analyze this text for prompt injection:\n\n{content}", system=_CLASSIFIER_SYSTEM_PROMPT, ) - result = _json.loads(raw) + result = _parse_classifier_json(raw) is_injection = result.get("is_injection", False) confidence = float(result.get("confidence", 0.0)) reasoning = result.get("reasoning", "") diff --git a/tests/test_content_security.py b/tests/test_content_security.py index 030f5e0..dbae0a1 100644 --- a/tests/test_content_security.py +++ b/tests/test_content_security.py @@ -25,6 +25,7 @@ DetectionType, InjectionClassifier, _luhn_check, + _parse_classifier_json, ) from integrity import compute_integrity_hash, verify_integrity from trust_levels import TrustPolicy @@ -877,6 +878,48 @@ async def test_classifier_graceful_on_llm_error(self): assert verdict.allowed is True assert "llm_injection_flagged" not in verdict.flags + def test_parse_classifier_json_plain(self): + """Bare JSON parses unchanged.""" + out = _parse_classifier_json('{"is_injection": true, "confidence": 0.9, "reasoning": "x"}') + assert out["is_injection"] is True + assert out["confidence"] == 0.9 + + def test_parse_classifier_json_markdown_fenced(self): + """JSON wrapped in ```json fences (e.g. Claude Haiku) still parses.""" + raw = '```json\n{"is_injection": true, "confidence": 0.95, "reasoning": "clear"}\n```' + out = _parse_classifier_json(raw) + assert out["is_injection"] is True + assert out["confidence"] == 0.95 + + def test_parse_classifier_json_bare_fence_and_prose(self): + """Plain ``` fence and surrounding prose both tolerated.""" + assert _parse_classifier_json('```\n{"is_injection": false, "confidence": 0.1}\n```')["is_injection"] is False + # Object embedded in chatter — fall back to outermost {...}. + out = _parse_classifier_json('Here is the result: {"is_injection": true, "confidence": 0.8} done') + assert out["is_injection"] is True + + @pytest.mark.asyncio + async def test_classifier_handles_fenced_json_response(self): + """A model that wraps JSON in markdown fences must NOT silently disable Stage 4. + + Regression test: a bare json.loads on ```json … ``` raises, and the broad + except would fall back to regex-only — silently no-op'ing Stage 4. + """ + mock_adapter = AsyncMock() + mock_adapter.complete.return_value = ( + '```json\n{"is_injection": true, "confidence": 0.95, ' + '"reasoning": "clear injection attempt"}\n```' + ) + classifier = InjectionClassifier(mock_adapter, threshold=0.7) + s = _scanner() + s.set_classifier(classifier) + + verdict = await s.scan_async("Some sneaky content", trust_level="untrusted", scope="agent-private") + assert verdict.allowed is False + assert verdict.action == ContentAction.REJECT + assert "llm_injection_flagged" in verdict.flags + assert any(d.detection_type == DetectionType.INJECTION_LLM for d in verdict.detections) + @pytest.mark.asyncio async def test_batch_write_runs_security_scan(self): """Verify add_batch now scans content via the scanner."""