From 2a16bf3c3b5668b2343dce90ad29511164024b7b Mon Sep 17 00:00:00 2001 From: kschlt Date: Thu, 26 Mar 2026 19:36:40 +0100 Subject: [PATCH 1/3] feat(enforcement): add ConflictDetector and live enforcement metadata Prevent silent failures in the enforcement pipeline: fragment-config conflicts would previously overwrite user settings without warning, and unroutable policy keys were silently dropped. ConflictDetector surfaces both categories so agents can act rather than discover drift later. _build_enforcement_metadata() derives adapter capabilities live from the registry so creation guidance and enforcement reality cannot drift apart when new adapters are added. --- adr_kit/decision/workflows/creation.py | 63 ++++ adr_kit/enforcement/conflict.py | 243 ++++++++++++++++ adr_kit/enforcement/pipeline.py | 172 ++++++++--- .../test_conflict_detection_pipeline.py | 266 +++++++++++++++++ tests/unit/test_conflict_detector.py | 275 ++++++++++++++++++ 5 files changed, 984 insertions(+), 35 deletions(-) create mode 100644 adr_kit/enforcement/conflict.py create mode 100644 tests/integration/test_conflict_detection_pipeline.py create mode 100644 tests/unit/test_conflict_detector.py diff --git a/adr_kit/decision/workflows/creation.py b/adr_kit/decision/workflows/creation.py index aab32df..69657f1 100644 --- a/adr_kit/decision/workflows/creation.py +++ b/adr_kit/decision/workflows/creation.py @@ -781,6 +781,10 @@ def _build_policy_reference(self) -> dict[str, Any]: This reference is provided just-in-time when agents need to construct structured policies, avoiding context bloat in MCP tool docstrings. + + Includes enforcement_metadata derived from the live adapter registry so + creation and enforcement cannot drift apart: when new adapters are added, + this reference automatically reflects expanded capabilities. """ return { "imports": { @@ -872,6 +876,65 @@ def _build_policy_reference(self) -> dict[str, Any]: "Better performance for I/O operations", ], }, + "enforcement_metadata": self._build_enforcement_metadata(), + } + + def _build_enforcement_metadata(self) -> dict[str, Any]: + """Build enforcement capability metadata derived from the adapter registry. + + This is derived at call-time from the live adapter registry — never + hardcoded — so that adding new adapters automatically updates this + reference without touching creation.py. + """ + from ...enforcement.adapters.eslint import ESLintAdapter + from ...enforcement.adapters.ruff import RuffAdapter + + adapters = [ESLintAdapter(), RuffAdapter()] + + # Map each policy key to the adapters that can enforce it + policy_coverage: dict[str, list[str]] = {} + adapter_details: dict[str, dict[str, Any]] = {} + + for adapter in adapters: + adapter_details[adapter.name] = { + "tool": adapter.name, + "supported_policy_keys": adapter.supported_policy_keys, + "supported_languages": adapter.supported_languages, + "output_modes": adapter.output_modes, + "supported_stages": adapter.supported_stages, + "config_targets": adapter.config_targets, + } + for key in adapter.supported_policy_keys: + policy_coverage.setdefault(key, []).append(adapter.name) + + # Flag policy keys that have no adapter (script_fallback path) + all_known_keys = [ + "imports", + "python", + "patterns", + "architecture", + "config_enforcement", + ] + for key in all_known_keys: + policy_coverage.setdefault(key, []) # Empty list = no native adapter + + enforcement_paths: dict[str, str] = {} + for key in all_known_keys: + covered_by = policy_coverage.get(key, []) + if covered_by: + enforcement_paths[key] = f"native_config via {', '.join(covered_by)}" + else: + enforcement_paths[key] = ( + "script_fallback (no native adapter — agent creates validation script)" + ) + + return { + "note": ( + "Coverage is stack-dependent: ESLint requires JS/TS project, " + "Ruff requires Python project. Unroutable policies generate fallback promptlets." + ), + "adapters": adapter_details, + "policy_enforcement_paths": enforcement_paths, } def _quick_quality_gate(self, creation_input: CreationInput) -> dict[str, Any]: diff --git a/adr_kit/enforcement/conflict.py b/adr_kit/enforcement/conflict.py new file mode 100644 index 0000000..5475cbf --- /dev/null +++ b/adr_kit/enforcement/conflict.py @@ -0,0 +1,243 @@ +"""Conflict detection for the enforcement pipeline. + +Two types of conflicts are detected: + +1. Policy-contract conflicts: A new ADR policy contradicts the existing contract. + Used by the Decision Plane (pre-approval validation) and reusable by any + decision-plane workflow (e.g., pre-approval checks). + +2. Fragment-config conflicts: A generated config fragment contradicts existing + user configuration already on disk in the target file. + Used by the enforcement pipeline before writing fragments. +""" + +import json +import re +from pathlib import Path +from typing import TYPE_CHECKING + +import toml + +from ..contract.models import ConstraintsContract +from ..core.model import PolicyModel + +if TYPE_CHECKING: + from .adapters.base import ConfigFragment + from .pipeline import EnforcementConflict + + +class ConflictDetector: + """Detects conflicts in the enforcement pipeline. + + Instances are stateless; create one per pipeline run or reuse freely. + """ + + def detect_policy_conflicts( + self, + new_policy: PolicyModel, + contract: ConstraintsContract, + ) -> list["EnforcementConflict"]: + """Detect contradictions between a new ADR policy and the existing contract. + + This is a Decision Plane utility: call it before approving an ADR to + surface architectural contradictions between decisions. The detection + logic is intentionally reusable outside the enforcement pipeline. + + Args: + new_policy: The structured policy from the ADR being approved. + contract: The current compiled ConstraintsContract. + + Returns: + List of EnforcementConflict describing each detected contradiction. + Empty list means no conflicts. + """ + from .pipeline import EnforcementConflict + + raw = contract.has_conflicts_with_policy(new_policy, adr_id="incoming") + conflicts: list[EnforcementConflict] = [] + + for description in raw: + source_adrs = self._extract_adr_ids(description) + conflicts.append( + EnforcementConflict( + adapter="policy_router", + description=description, + source_adrs=source_adrs, + ) + ) + + # Python import disallow vs existing import prefer + if new_policy.python and new_policy.python.disallow_imports: + if contract.constraints.imports and contract.constraints.imports.prefer: + for item in new_policy.python.disallow_imports: + if item in contract.constraints.imports.prefer: + source = contract._find_provenance_for_rule( + f"imports.prefer.{item}" + ) + conflicts.append( + EnforcementConflict( + adapter="policy_router", + description=( + f"New policy wants to disallow python import '{item}' " + f"but {source} prefers it" + ), + source_adrs=[source], + ) + ) + + return conflicts + + def detect_config_conflicts( + self, + fragments: list["ConfigFragment"], + project_path: Path, + ) -> list["EnforcementConflict"]: + """Detect contradictions between adapter fragments and existing user config. + + For each fragment, inspects the target file on disk. If the file exists + and contains settings that explicitly contradict what the fragment wants + to enforce, a conflict is recorded. + + Fragments with detected conflicts must NOT be written to disk; the + calling pipeline is responsible for routing them to EnforcementResult.conflicts + and surfacing them to the agent for resolution. + + Args: + fragments: In-memory fragments produced by adapters (not yet written). + project_path: Absolute path to the project root. + + Returns: + List of EnforcementConflict for fragments with contradictions. + Empty list means all fragments are safe to apply. + """ + from .pipeline import EnforcementConflict + + conflicts: list[EnforcementConflict] = [] + + for fragment in fragments: + target = project_path / fragment.target_file + if not target.exists(): + continue # Nothing on disk — no conflict possible + + existing_text = target.read_text(encoding="utf-8") + + if fragment.fragment_type == "json_file": + conflicts.extend(self._check_json_conflict(fragment, existing_text)) + elif fragment.fragment_type in ("toml_file", "toml_section"): + conflicts.extend(self._check_toml_conflict(fragment, existing_text)) + + return conflicts + + # ------------------------------------------------------------------ + # JSON conflict check (ESLint-style) + # ------------------------------------------------------------------ + + def _check_json_conflict( + self, + fragment: "ConfigFragment", + existing_text: str, + ) -> list["EnforcementConflict"]: + """ESLint: detect rules that the fragment enables but the user has disabled.""" + from .pipeline import EnforcementConflict + + try: + existing = json.loads(existing_text) + generated = json.loads(fragment.content) + except (json.JSONDecodeError, ValueError): + return [] + + conflicts: list[EnforcementConflict] = [] + existing_rules: dict = existing.get("rules", {}) + generated_rules: dict = generated.get("rules", {}) + + for rule_name, generated_level in generated_rules.items(): + existing_level = existing_rules.get(rule_name) + if existing_level is None: + continue # Rule not present in user config — no conflict + + if self._rule_is_disabled(existing_level) and self._rule_is_enabled( + generated_level + ): + conflicts.append( + EnforcementConflict( + adapter=fragment.adapter, + description=( + f"Fragment wants to enable ESLint rule '{rule_name}' " + f"in '{fragment.target_file}' but the existing config " + f"explicitly disables it. " + f"Resolve: remove the 'off' override or update the ADR policy." + ), + source_adrs=list(fragment.policy_keys), + ) + ) + + return conflicts + + # ------------------------------------------------------------------ + # TOML conflict check (Ruff-style) + # ------------------------------------------------------------------ + + def _check_toml_conflict( + self, + fragment: "ConfigFragment", + existing_text: str, + ) -> list["EnforcementConflict"]: + """Ruff: detect rules the fragment selects that the user has explicitly ignored.""" + from .pipeline import EnforcementConflict + + try: + existing = toml.loads(existing_text) + generated = toml.loads(fragment.content) + except Exception: + return [] + + conflicts: list[EnforcementConflict] = [] + + # Navigate ruff lint section (handles both [lint] and [tool.ruff.lint]) + existing_lint = self._get_ruff_lint(existing) + generated_lint = self._get_ruff_lint(generated) + + existing_ignore: set[str] = set(existing_lint.get("ignore", [])) + generated_select: set[str] = set(generated_lint.get("select", [])) + + for rule in generated_select & existing_ignore: + conflicts.append( + EnforcementConflict( + adapter=fragment.adapter, + description=( + f"Fragment wants to enforce Ruff rule '{rule}' " + f"in '{fragment.target_file}' but the existing config " + f"explicitly ignores it. " + f"Resolve: remove the rule from 'ignore' or update the ADR policy." + ), + source_adrs=list(fragment.policy_keys), + ) + ) + + return conflicts + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + @staticmethod + def _rule_is_disabled(level: object) -> bool: + return level in ("off", 0, "0") + + @staticmethod + def _rule_is_enabled(level: object) -> bool: + return level not in ("off", 0, "0") + + @staticmethod + def _get_ruff_lint(config: dict) -> dict: + """Extract the ruff lint section regardless of nesting depth.""" + # [lint] at top level (adr-generated format) + if "lint" in config: + return config["lint"] + # [tool.ruff.lint] + return config.get("tool", {}).get("ruff", {}).get("lint", {}) + + @staticmethod + def _extract_adr_ids(text: str) -> list[str]: + """Extract ADR-NNNN identifiers from a conflict description string.""" + return sorted(set(re.findall(r"ADR-\d+", text))) diff --git a/adr_kit/enforcement/pipeline.py b/adr_kit/enforcement/pipeline.py index 2579615..55f5cc1 100644 --- a/adr_kit/enforcement/pipeline.py +++ b/adr_kit/enforcement/pipeline.py @@ -8,16 +8,17 @@ 1. Read MergedConstraints from the contract 2. Detect project technology stack 3. Route via PolicyRouter → select adapters for the detected stack - 4. Run selected adapters → collect ConfigFragments, write to disk + 3.5 Collect all fragments (without writing); run ConflictDetector + 4. Write conflict-free fragments to disk; skip conflicting ones + 4.5 Generate fallback promptlets for unroutable policy keys 5. Generate secondary artifacts (validation scripts, git hooks, CI workflow) 6. Return EnforcementResult envelope - -Conflict detection (CFD task) will slot between stages 3 and 4 once implemented. """ import hashlib import json from pathlib import Path +from typing import Any from pydantic import BaseModel, Field @@ -142,6 +143,7 @@ def compile( """ from .adapters.eslint import ESLintAdapter from .adapters.ruff import RuffAdapter + from .conflict import ConflictDetector from .detection.stack import StackDetector from .router import PolicyRouter @@ -164,24 +166,43 @@ def compile( router = PolicyRouter([ESLintAdapter(), RuffAdapter()]) decisions, unroutable_keys = router.route(contract, detected_stack) - # Record unroutable policy keys - for key in unroutable_keys: - result.skipped_adapters.append( - SkippedAdapter( - adapter="none", - reason=f"no adapter for policy key: {key}", - ) - ) - - # Stage 3: Run selected adapters + # Stage 3: Collect all fragments from selected adapters (without writing) + all_fragments_by_adapter: dict[str, list[object]] = {} + selected_names: set[str] = set() for decision in decisions: - self._run_adapter(decision.adapter, constraints, result) + adapter = decision.adapter + selected_names.add(adapter.name) + frags = self._collect_fragments_from_adapter(adapter, constraints, result) + all_fragments_by_adapter[adapter.name] = frags + + # Stage 3.5: Detect fragment-config conflicts before writing anything + all_fragments = [ + f for frags in all_fragments_by_adapter.values() for f in frags + ] + conflict_detector = ConflictDetector() + config_conflicts = conflict_detector.detect_config_conflicts( + all_fragments, self.project_path # type: ignore[arg-type] + ) + result.conflicts.extend(config_conflicts) + conflicting_adapters = {c.adapter for c in config_conflicts} + + # Stage 4: Write conflict-free fragments; skip adapters with conflicts + for adapter_name, fragments in all_fragments_by_adapter.items(): + if adapter_name in conflicting_adapters: + n = sum(1 for c in config_conflicts if c.adapter == adapter_name) + result.skipped_adapters.append( + SkippedAdapter( + adapter=adapter_name, + reason=f"skipped: {n} conflict(s) with existing config — see result.conflicts", + ) + ) + else: + for fragment in fragments: + self._write_fragment(fragment, result) # type: ignore[arg-type] # Record adapters not selected (skipped due to stack mismatch or no matching keys) - selected_names = {d.adapter.name for d in decisions} for adapter in router.adapters: if adapter.name not in selected_names: - # Determine why it was skipped stack_set = set(detected_stack) if not set(adapter.supported_languages) & stack_set: reason = ( @@ -194,7 +215,18 @@ def compile( SkippedAdapter(adapter=adapter.name, reason=reason) ) - # Stage 4: Generate secondary artifacts + # Stage 4.5: Generate fallback promptlets for unroutable policy keys + for key in unroutable_keys: + promptlet = self._build_fallback_promptlet(key, contract) + result.fallback_promptlets.append(promptlet) + result.skipped_adapters.append( + SkippedAdapter( + adapter="none", + reason=f"unroutable policy key '{key}': fallback promptlet generated", + ) + ) + + # Stage 5: Generate secondary artifacts self._run_script_generator(result) self._run_hook_generator(result) @@ -210,18 +242,22 @@ def compile( # Internal adapter execution # ------------------------------------------------------------------ - def _run_adapter( + def _collect_fragments_from_adapter( self, adapter: object, constraints: object, result: EnforcementResult, - ) -> None: - """Call an adapter, write its fragments to disk, and record results.""" + ) -> list[object]: + """Generate fragments from an adapter without writing to disk. + + Returns the list of ConfigFragment objects, or [] on error. + Errors are recorded in result.skipped_adapters. + """ from ..contract.models import MergedConstraints from .adapters.base import BaseAdapter if not isinstance(adapter, BaseAdapter): - return + return [] if not isinstance(constraints, MergedConstraints): result.skipped_adapters.append( SkippedAdapter( @@ -229,22 +265,10 @@ def _run_adapter( reason="invalid constraints object", ) ) - return + return [] try: - fragments = adapter.generate_fragments(constraints) - for fragment in fragments: - output_file = self.project_path / fragment.target_file - output_file.write_text(fragment.content) - result.fragments_applied.append( - AppliedFragment( - adapter=fragment.adapter, - target_file=str(output_file), - policy_keys=fragment.policy_keys, - fragment_type=fragment.fragment_type, - ) - ) - result.files_touched.append(str(output_file)) + return list(adapter.generate_fragments(constraints)) except Exception as e: result.skipped_adapters.append( SkippedAdapter( @@ -252,6 +276,84 @@ def _run_adapter( reason=f"adapter error: {e}", ) ) + return [] + + def _write_fragment( + self, + fragment: object, + result: EnforcementResult, + ) -> None: + """Write a single ConfigFragment to disk and record it in the result.""" + from .adapters.base import ConfigFragment + + if not isinstance(fragment, ConfigFragment): + return + + output_file = self.project_path / fragment.target_file + output_file.write_text(fragment.content) + result.fragments_applied.append( + AppliedFragment( + adapter=fragment.adapter, + target_file=str(output_file), + policy_keys=fragment.policy_keys, + fragment_type=fragment.fragment_type, + ) + ) + result.files_touched.append(str(output_file)) + + def _build_fallback_promptlet( + self, + policy_key: str, + contract: ConstraintsContract, + ) -> str: + """Build a JSON promptlet for an unroutable policy key. + + The promptlet instructs the calling agent to create a validation script + that enforces the policy. Scripts placed in scripts/adr-validations/ are + treated as first-class enforcement artifacts by the pipeline. + """ + # Collect constraint value and source ADRs for this policy key + constraints_dump = contract.constraints.model_dump(exclude_none=True) + constraint_value: Any = constraints_dump.get(policy_key) + + source_adrs = sorted( + { + prov.adr_id + for rule_path, prov in contract.provenance.items() + if rule_path == policy_key or rule_path.startswith(policy_key + ".") + } + ) + + promptlet = { + "unenforceable_policy": { + "policy_key": policy_key, + "constraint": constraint_value, + "source_adrs": source_adrs, + }, + "instruction": ( + f"No enforcement adapter exists for policy key '{policy_key}'. " + "Create a validation script that checks for this policy constraint." + ), + "script_requirements": { + "input": "list of file paths to check", + "output": "EnforcementReport JSON (schema: {passed, violations: [{file, message, severity}]})", + "integration": ( + "Place in scripts/adr-validations/ — " + "the enforcement pipeline picks up all scripts in that directory" + ), + }, + "example_script_structure": ( + "#!/usr/bin/env python3\n" + "import sys, json\n" + "violations = []\n" + "for path in sys.argv[1:]:\n" + " # ... check constraint ...\n" + " pass\n" + 'print(json.dumps({"passed": not violations, "violations": violations}))' + ), + } + + return json.dumps(promptlet, indent=2, default=str) def _run_script_generator(self, result: EnforcementResult) -> None: """Generate per-ADR validation scripts.""" diff --git a/tests/integration/test_conflict_detection_pipeline.py b/tests/integration/test_conflict_detection_pipeline.py new file mode 100644 index 0000000..7cf42ba --- /dev/null +++ b/tests/integration/test_conflict_detection_pipeline.py @@ -0,0 +1,266 @@ +"""Integration tests for CFD — Conflict Detection + Guided Fallback. + +Covers: +- Conflicting fragment NOT silently applied — surfaces in EnforcementResult.conflicts +- Unroutable policies generate fallback promptlets (not silently dropped) +- Clean fragments still applied when only some adapters conflict +- Pipeline with mix of routable and unroutable policies +""" + +import json +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from adr_kit.contract.models import ( + ConstraintsContract, + ContractMetadata, + MergedConstraints, + PolicyProvenance, +) +from adr_kit.core.model import ImportPolicy, PatternPolicy, PythonPolicy +from adr_kit.enforcement.pipeline import EnforcementPipeline + + +def _make_contract( + constraints: MergedConstraints, + source_adrs: list[str] | None = None, + provenance: dict | None = None, + tmp_path: Path | None = None, +) -> ConstraintsContract: + metadata = ContractMetadata( + hash="test-hash", + source_adrs=source_adrs or ["ADR-0001"], + adr_directory=str(tmp_path or Path("/tmp")), + ) + return ConstraintsContract( + metadata=metadata, + constraints=constraints, + provenance=provenance or {}, + approved_adrs=[], + ) + + +# --------------------------------------------------------------------------- +# Fragment-config conflict detection in pipeline +# --------------------------------------------------------------------------- + + +class TestPipelineConflictDetection: + def test_conflicting_eslint_fragment_not_applied(self, tmp_path: Path): + """ESLint fragment conflicts with existing .eslintrc.adrs.json — not written.""" + # Pre-existing config that disables the rule the adapter wants to enable + existing_eslint = {"rules": {"no-restricted-imports": "off"}} + (tmp_path / ".eslintrc.adrs.json").write_text(json.dumps(existing_eslint)) + + contract = _make_contract( + constraints=MergedConstraints(imports=ImportPolicy(disallow=["axios"])), + tmp_path=tmp_path, + ) + pipeline = EnforcementPipeline(adr_dir=tmp_path, project_path=tmp_path) + result = pipeline.compile(contract=contract, detected_stack=["javascript"]) + + # Conflict must be recorded + assert len(result.conflicts) >= 1 + conflict_descriptions = " ".join(c.description for c in result.conflicts) + assert "no-restricted-imports" in conflict_descriptions + + # Fragment must NOT be applied (file content unchanged) + on_disk = json.loads((tmp_path / ".eslintrc.adrs.json").read_text()) + assert on_disk["rules"]["no-restricted-imports"] == "off" + + # ESLint must appear in skipped_adapters (conflict reason) + eslint_skipped = [s for s in result.skipped_adapters if s.adapter == "eslint"] + assert eslint_skipped, "ESLint should be in skipped_adapters due to conflict" + assert "conflict" in eslint_skipped[0].reason + + def test_no_conflict_when_existing_file_absent(self, tmp_path: Path): + """No existing config → adapter runs normally, no conflicts.""" + contract = _make_contract( + constraints=MergedConstraints(imports=ImportPolicy(disallow=["axios"])), + tmp_path=tmp_path, + ) + pipeline = EnforcementPipeline(adr_dir=tmp_path, project_path=tmp_path) + result = pipeline.compile(contract=contract, detected_stack=["javascript"]) + + assert result.conflicts == [] + eslint_applied = [f for f in result.fragments_applied if f.adapter == "eslint"] + assert len(eslint_applied) == 1 + assert (tmp_path / ".eslintrc.adrs.json").exists() + + def test_conflict_surfaces_source_adrs(self, tmp_path: Path): + """Conflict records policy_keys of the conflicting fragment.""" + existing_eslint = {"rules": {"no-restricted-imports": "off"}} + (tmp_path / ".eslintrc.adrs.json").write_text(json.dumps(existing_eslint)) + + prov_key = "imports.disallow.axios" + provenance = { + prov_key: PolicyProvenance( + adr_id="ADR-0001", + adr_title="No Axios", + rule_path=prov_key, + effective_date=datetime(2024, 1, 1, tzinfo=timezone.utc), + clause_id=PolicyProvenance.make_clause_id("ADR-0001", prov_key), + ) + } + contract = _make_contract( + constraints=MergedConstraints(imports=ImportPolicy(disallow=["axios"])), + provenance=provenance, + tmp_path=tmp_path, + ) + pipeline = EnforcementPipeline(adr_dir=tmp_path, project_path=tmp_path) + result = pipeline.compile(contract=contract, detected_stack=["javascript"]) + + assert len(result.conflicts) >= 1 + # source_adrs contains the policy key that caused the conflict + all_source_adrs = [adr for c in result.conflicts for adr in c.source_adrs] + assert any("imports" in s for s in all_source_adrs) + + +# --------------------------------------------------------------------------- +# Fallback promptlet generation for unroutable policies +# --------------------------------------------------------------------------- + + +class TestPipelineFallbackPromptlets: + def test_unroutable_policy_generates_promptlet(self, tmp_path: Path): + """'patterns' key has no adapter — must generate a fallback promptlet.""" + from adr_kit.core.model import PatternRule + + contract = _make_contract( + constraints=MergedConstraints( + patterns=PatternPolicy( + patterns={ + "no_god_objects": PatternRule( + description="Classes must not exceed 500 lines", + language="python", + rule=r"^class\s+\w+", + severity="error", + ) + } + ) + ), + tmp_path=tmp_path, + ) + pipeline = EnforcementPipeline(adr_dir=tmp_path, project_path=tmp_path) + # Use python stack so Ruff adapter is considered (but patterns not covered by Ruff) + result = pipeline.compile(contract=contract, detected_stack=["python"]) + + assert len(result.fallback_promptlets) >= 1 + promptlet = json.loads(result.fallback_promptlets[0]) + assert "unenforceable_policy" in promptlet + assert promptlet["unenforceable_policy"]["policy_key"] == "patterns" + assert "instruction" in promptlet + assert "script_requirements" in promptlet + assert "integration" in promptlet["script_requirements"] + + def test_routable_policy_no_promptlet(self, tmp_path: Path): + """'imports' is routable via ESLint — no fallback promptlet generated.""" + contract = _make_contract( + constraints=MergedConstraints(imports=ImportPolicy(disallow=["axios"])), + tmp_path=tmp_path, + ) + pipeline = EnforcementPipeline(adr_dir=tmp_path, project_path=tmp_path) + result = pipeline.compile(contract=contract, detected_stack=["javascript"]) + + assert result.fallback_promptlets == [] + + def test_unroutable_key_in_skipped_adapters(self, tmp_path: Path): + """Unroutable policy key is also recorded in skipped_adapters with fallback reason.""" + from adr_kit.core.model import PatternRule + + contract = _make_contract( + constraints=MergedConstraints( + patterns=PatternPolicy( + patterns={ + "check": PatternRule( + description="Check something", + language="python", + rule=r"\bfoo\b", + severity="warning", + ) + } + ) + ), + tmp_path=tmp_path, + ) + pipeline = EnforcementPipeline(adr_dir=tmp_path, project_path=tmp_path) + result = pipeline.compile(contract=contract, detected_stack=["python"]) + + fallback_skipped = [ + s + for s in result.skipped_adapters + if "unroutable" in s.reason or "fallback" in s.reason + ] + assert fallback_skipped, "Unroutable key should appear in skipped_adapters" + + def test_promptlet_includes_constraint_value(self, tmp_path: Path): + """Fallback promptlet includes the actual constraint data.""" + from adr_kit.core.model import PatternRule + + contract = _make_contract( + constraints=MergedConstraints( + patterns=PatternPolicy( + patterns={ + "no_print": PatternRule( + description="No print statements", + language="python", + rule=r"\bprint\s*\(", + severity="error", + ) + } + ) + ), + tmp_path=tmp_path, + ) + pipeline = EnforcementPipeline(adr_dir=tmp_path, project_path=tmp_path) + result = pipeline.compile(contract=contract, detected_stack=["python"]) + + assert result.fallback_promptlets + promptlet = json.loads(result.fallback_promptlets[0]) + # Constraint value should be present (patterns dict is non-None) + assert promptlet["unenforceable_policy"]["constraint"] is not None + + +# --------------------------------------------------------------------------- +# Mixed routable + unroutable policies +# --------------------------------------------------------------------------- + + +class TestPipelineMixedPolicies: + def test_routable_applied_unroutable_prompts(self, tmp_path: Path): + """Python import (Ruff) is routable; patterns is not. Both handled correctly.""" + from adr_kit.core.model import PatternRule + + contract = _make_contract( + constraints=MergedConstraints( + python=PythonPolicy(disallow_imports=["requests"]), + patterns=PatternPolicy( + patterns={ + "no_bare_except": PatternRule( + description="No bare except clauses", + language="python", + rule=r"except\s*:", + severity="error", + ) + } + ), + ), + tmp_path=tmp_path, + ) + pipeline = EnforcementPipeline(adr_dir=tmp_path, project_path=tmp_path) + result = pipeline.compile(contract=contract, detected_stack=["python"]) + + # Ruff adapter should have applied (python disallow_imports) + ruff_applied = [f for f in result.fragments_applied if f.adapter == "ruff"] + assert len(ruff_applied) == 1 + assert (tmp_path / ".ruff-adr.toml").exists() + + # Fallback promptlet for patterns + assert len(result.fallback_promptlets) >= 1 + promptlet = json.loads(result.fallback_promptlets[0]) + assert promptlet["unenforceable_policy"]["policy_key"] == "patterns" + + # No conflicts (no existing files to conflict with) + assert result.conflicts == [] diff --git a/tests/unit/test_conflict_detector.py b/tests/unit/test_conflict_detector.py new file mode 100644 index 0000000..e29fa62 --- /dev/null +++ b/tests/unit/test_conflict_detector.py @@ -0,0 +1,275 @@ +"""Unit tests for ConflictDetector (CFD task). + +Covers: +- detect_policy_conflicts: new policy vs existing contract +- detect_config_conflicts: JSON (ESLint) and TOML (Ruff) fragment vs existing file +""" + +import json +from datetime import date, datetime, timezone +from pathlib import Path + +import pytest + +from adr_kit.contract.models import ( + ConstraintsContract, + ContractMetadata, + MergedConstraints, + PolicyProvenance, +) +from adr_kit.core.model import ImportPolicy, PolicyModel, PythonPolicy +from adr_kit.enforcement.adapters.base import ConfigFragment +from adr_kit.enforcement.conflict import ConflictDetector + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_contract( + imports_disallow: list[str] | None = None, + imports_prefer: list[str] | None = None, + provenance: dict | None = None, +) -> ConstraintsContract: + constraints = MergedConstraints( + imports=( + ImportPolicy(disallow=imports_disallow, prefer=imports_prefer) + if (imports_disallow or imports_prefer) + else None + ), + ) + metadata = ContractMetadata( + hash="abc", + source_adrs=["ADR-0001"], + adr_directory="/fake/adr", + ) + prov = provenance or {} + return ConstraintsContract( + metadata=metadata, + constraints=constraints, + provenance=prov, + approved_adrs=[], + ) + + +def _make_eslint_fragment( + rules: dict, + policy_keys: list[str] | None = None, +) -> ConfigFragment: + content = json.dumps({"rules": rules}) + return ConfigFragment( + adapter="eslint", + target_file=".eslintrc.adrs.json", + content=content, + fragment_type="json_file", + policy_keys=policy_keys or ["imports.disallow.axios"], + ) + + +RUFF_TOML_TEMPLATE = """\ +[lint] +select = {select} +ignore = [] +""" + + +def _make_ruff_fragment( + select: list[str], + policy_keys: list[str] | None = None, +) -> ConfigFragment: + select_str = json.dumps(select) + content = f"[lint]\nselect = {select_str}\n" + return ConfigFragment( + adapter="ruff", + target_file=".ruff-adr.toml", + content=content, + fragment_type="toml_file", + policy_keys=policy_keys or ["python.disallow_imports.requests"], + ) + + +# --------------------------------------------------------------------------- +# detect_policy_conflicts +# --------------------------------------------------------------------------- + + +class TestDetectPolicyConflicts: + def test_no_conflict_when_contract_empty(self): + detector = ConflictDetector() + contract = _make_contract() + policy = PolicyModel(imports=ImportPolicy(disallow=["flask"])) + result = detector.detect_policy_conflicts(policy, contract) + assert result == [] + + def test_disallow_vs_prefer_conflict(self): + """New policy disallows 'flask' but existing contract prefers it.""" + contract = _make_contract(imports_prefer=["flask"]) + detector = ConflictDetector() + policy = PolicyModel(imports=ImportPolicy(disallow=["flask"])) + result = detector.detect_policy_conflicts(policy, contract) + assert len(result) == 1 + assert "flask" in result[0].description + + def test_prefer_vs_disallow_conflict(self): + """New policy prefers 'flask' but existing contract disallows it.""" + contract = _make_contract(imports_disallow=["flask"]) + detector = ConflictDetector() + policy = PolicyModel(imports=ImportPolicy(prefer=["flask"])) + result = detector.detect_policy_conflicts(policy, contract) + assert len(result) == 1 + assert "flask" in result[0].description + + def test_no_conflict_different_packages(self): + contract = _make_contract(imports_prefer=["fastapi"]) + detector = ConflictDetector() + policy = PolicyModel(imports=ImportPolicy(disallow=["flask"])) + result = detector.detect_policy_conflicts(policy, contract) + assert result == [] + + def test_conflict_adapter_field_is_policy_router(self): + contract = _make_contract(imports_prefer=["flask"]) + detector = ConflictDetector() + policy = PolicyModel(imports=ImportPolicy(disallow=["flask"])) + result = detector.detect_policy_conflicts(policy, contract) + assert result[0].adapter == "policy_router" + + def test_python_disallow_vs_imports_prefer_conflict(self): + """New python.disallow_imports vs existing imports.prefer.""" + contract = _make_contract(imports_prefer=["requests"]) + detector = ConflictDetector() + policy = PolicyModel(python=PythonPolicy(disallow_imports=["requests"])) + result = detector.detect_policy_conflicts(policy, contract) + assert len(result) == 1 + assert "requests" in result[0].description + + def test_no_conflict_with_no_policy(self): + contract = _make_contract(imports_disallow=["flask"]) + detector = ConflictDetector() + policy = PolicyModel() + result = detector.detect_policy_conflicts(policy, contract) + assert result == [] + + +# --------------------------------------------------------------------------- +# detect_config_conflicts — JSON / ESLint +# --------------------------------------------------------------------------- + + +class TestDetectConfigConflictsJson: + def test_no_conflict_when_target_absent(self, tmp_path: Path): + detector = ConflictDetector() + fragment = _make_eslint_fragment({"no-restricted-imports": "error"}) + result = detector.detect_config_conflicts([fragment], tmp_path) + assert result == [] + + def test_no_conflict_when_rule_not_in_existing(self, tmp_path: Path): + existing = {"rules": {"no-console": "off"}} + (tmp_path / ".eslintrc.adrs.json").write_text(json.dumps(existing)) + fragment = _make_eslint_fragment({"no-restricted-imports": "error"}) + detector = ConflictDetector() + result = detector.detect_config_conflicts([fragment], tmp_path) + assert result == [] + + def test_conflict_when_existing_disables_rule(self, tmp_path: Path): + """Existing config has 'no-restricted-imports: off' → conflict.""" + existing = {"rules": {"no-restricted-imports": "off"}} + (tmp_path / ".eslintrc.adrs.json").write_text(json.dumps(existing)) + fragment = _make_eslint_fragment({"no-restricted-imports": "error"}) + detector = ConflictDetector() + result = detector.detect_config_conflicts([fragment], tmp_path) + assert len(result) == 1 + assert "no-restricted-imports" in result[0].description + assert result[0].adapter == "eslint" + + def test_conflict_with_numeric_zero(self, tmp_path: Path): + """ESLint uses 0 for 'off'.""" + existing = {"rules": {"no-restricted-imports": 0}} + (tmp_path / ".eslintrc.adrs.json").write_text(json.dumps(existing)) + fragment = _make_eslint_fragment({"no-restricted-imports": "error"}) + detector = ConflictDetector() + result = detector.detect_config_conflicts([fragment], tmp_path) + assert len(result) == 1 + + def test_no_conflict_when_both_enabled(self, tmp_path: Path): + """Both existing and fragment enable the same rule — no conflict.""" + existing = {"rules": {"no-restricted-imports": "error"}} + (tmp_path / ".eslintrc.adrs.json").write_text(json.dumps(existing)) + fragment = _make_eslint_fragment({"no-restricted-imports": "error"}) + detector = ConflictDetector() + result = detector.detect_config_conflicts([fragment], tmp_path) + assert result == [] + + def test_conflict_includes_policy_keys_as_source_adrs(self, tmp_path: Path): + existing = {"rules": {"no-restricted-imports": "off"}} + (tmp_path / ".eslintrc.adrs.json").write_text(json.dumps(existing)) + fragment = _make_eslint_fragment( + {"no-restricted-imports": "error"}, + policy_keys=["imports.disallow.axios"], + ) + detector = ConflictDetector() + result = detector.detect_config_conflicts([fragment], tmp_path) + assert "imports.disallow.axios" in result[0].source_adrs + + def test_invalid_json_in_existing_ignored(self, tmp_path: Path): + (tmp_path / ".eslintrc.adrs.json").write_text("not valid json {{") + fragment = _make_eslint_fragment({"no-restricted-imports": "error"}) + detector = ConflictDetector() + result = detector.detect_config_conflicts([fragment], tmp_path) + assert result == [] + + +# --------------------------------------------------------------------------- +# detect_config_conflicts — TOML / Ruff +# --------------------------------------------------------------------------- + + +class TestDetectConfigConflictsToml: + def test_no_conflict_when_target_absent(self, tmp_path: Path): + detector = ConflictDetector() + fragment = _make_ruff_fragment(["I001"]) + result = detector.detect_config_conflicts([fragment], tmp_path) + assert result == [] + + def test_no_conflict_when_ignored_set_empty(self, tmp_path: Path): + existing = "[lint]\nignore = []\n" + (tmp_path / ".ruff-adr.toml").write_text(existing) + fragment = _make_ruff_fragment(["I001"]) + detector = ConflictDetector() + result = detector.detect_config_conflicts([fragment], tmp_path) + assert result == [] + + def test_conflict_when_rule_in_ignore(self, tmp_path: Path): + """Existing config ignores I001 but fragment selects it.""" + existing = '[lint]\nignore = ["I001"]\n' + (tmp_path / ".ruff-adr.toml").write_text(existing) + fragment = _make_ruff_fragment(["I001"]) + detector = ConflictDetector() + result = detector.detect_config_conflicts([fragment], tmp_path) + assert len(result) == 1 + assert "I001" in result[0].description + assert result[0].adapter == "ruff" + + def test_no_conflict_different_rules(self, tmp_path: Path): + existing = '[lint]\nignore = ["E501"]\n' + (tmp_path / ".ruff-adr.toml").write_text(existing) + fragment = _make_ruff_fragment(["I001"]) + detector = ConflictDetector() + result = detector.detect_config_conflicts([fragment], tmp_path) + assert result == [] + + def test_multiple_fragments_multiple_conflicts(self, tmp_path: Path): + """Two fragments with conflicts — both detected.""" + eslint_existing = {"rules": {"no-restricted-imports": "off"}} + (tmp_path / ".eslintrc.adrs.json").write_text(json.dumps(eslint_existing)) + ruff_existing = '[lint]\nignore = ["I001"]\n' + (tmp_path / ".ruff-adr.toml").write_text(ruff_existing) + + fragments = [ + _make_eslint_fragment({"no-restricted-imports": "error"}), + _make_ruff_fragment(["I001"]), + ] + detector = ConflictDetector() + result = detector.detect_config_conflicts(fragments, tmp_path) + assert len(result) == 2 + adapters = {c.adapter for c in result} + assert adapters == {"eslint", "ruff"} From ce02a92dc7e3b2089ed51577be8ac646d09b057b Mon Sep 17 00:00:00 2001 From: kschlt Date: Thu, 26 Mar 2026 19:37:29 +0100 Subject: [PATCH 2/3] chore: update CHANGELOG for CFD task --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1419bf..3f2362e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- `ConflictDetector` — detects two classes of enforcement conflicts: (1) policy-contract conflicts (new ADR policy contradicts existing contract, e.g. one ADR allows Flask while another bans it) and (2) fragment-config conflicts (adapter-generated fragment contradicts existing user config on disk). Policy conflict detection is reusable by decision-plane workflows for pre-approval validation +- Guided fallback for unroutable policies — when no adapter can handle a policy key, the pipeline generates a structured promptlet instructing the agent to create a validation script, rather than silently dropping the policy. Scripts placed in `scripts/adr-validations/` are treated as first-class enforcement artifacts +- Enforcement metadata in `_build_policy_reference()` — creation workflow now shows agents which policy keys have native adapter coverage (and which tool), which fall back to scripts, and which have no enforcement path yet. Metadata is derived live from the adapter registry so creation guidance stays in sync as adapters are added +- Conflict pipeline wiring — `EnforcementPipeline.compile()` now collects all fragments before writing, runs conflict detection, writes only conflict-free fragments, and surfaces conflicting ones in `EnforcementResult.conflicts` with full context (adapter, description, implicated policy keys) - Canonical enforcement pipeline (`EnforcementPipeline`) — single entry point for all enforcement that reads exclusively from the compiled architecture contract, never from raw ADR files - `EnforcementResult` audit envelope produced on every ADR approval: tracks which config fragments were applied, which adapters were skipped and why, any conflicts detected, clause-level provenance, and an idempotency hash (same contract → identical hash) - Contract-driven ESLint adapter (`generate_eslint_config_from_contract`) — generates `no-restricted-imports` rules directly from compiled `MergedConstraints` From 8ec9b1f5cf59a84dcdf4d582e0b266a7f267d851 Mon Sep 17 00:00:00 2001 From: kschlt Date: Thu, 26 Mar 2026 20:24:55 +0100 Subject: [PATCH 3/3] fix(enforcement): resolve mypy type annotation errors in conflict detection Properly type the _get_ruff_lint return value with dict[str, Any] to satisfy warn_return_any, and remove a now-unused type: ignore on _write_fragment that became superfluous after prior type improvements. --- adr_kit/enforcement/conflict.py | 10 ++++++---- adr_kit/enforcement/pipeline.py | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/adr_kit/enforcement/conflict.py b/adr_kit/enforcement/conflict.py index 5475cbf..3df6414 100644 --- a/adr_kit/enforcement/conflict.py +++ b/adr_kit/enforcement/conflict.py @@ -14,7 +14,7 @@ import json import re from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import toml @@ -229,13 +229,15 @@ def _rule_is_enabled(level: object) -> bool: return level not in ("off", 0, "0") @staticmethod - def _get_ruff_lint(config: dict) -> dict: + def _get_ruff_lint(config: dict[str, Any]) -> dict[str, Any]: """Extract the ruff lint section regardless of nesting depth.""" # [lint] at top level (adr-generated format) if "lint" in config: - return config["lint"] + lint: dict[str, Any] = config["lint"] + return lint # [tool.ruff.lint] - return config.get("tool", {}).get("ruff", {}).get("lint", {}) + result: dict[str, Any] = config.get("tool", {}).get("ruff", {}).get("lint", {}) + return result @staticmethod def _extract_adr_ids(text: str) -> list[str]: diff --git a/adr_kit/enforcement/pipeline.py b/adr_kit/enforcement/pipeline.py index 55f5cc1..77d81a7 100644 --- a/adr_kit/enforcement/pipeline.py +++ b/adr_kit/enforcement/pipeline.py @@ -198,7 +198,7 @@ def compile( ) else: for fragment in fragments: - self._write_fragment(fragment, result) # type: ignore[arg-type] + self._write_fragment(fragment, result) # Record adapters not selected (skipped due to stack mismatch or no matching keys) for adapter in router.adapters: