From d4b01d2916ae52b5ba724fa6c2aa3ef22841eff9 Mon Sep 17 00:00:00 2001 From: kschlt Date: Tue, 24 Mar 2026 23:04:13 +0100 Subject: [PATCH 1/4] chore: update /close skill tracking format and add CHANGELOG step MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Updated /close skill step 6 to record completed tasks via the baseline summary line instead of the Done section (matches current task-tracking.md format). Added CHANGELOG.md update step so user-facing changes are captured as part of every task close. Also added the RST module restructure entry to CHANGELOG.md under [Unreleased] — this was missing from the RST task close. --- .claude/skills/close/SKILL.md | 6 +++++- CHANGELOG.md | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/.claude/skills/close/SKILL.md b/.claude/skills/close/SKILL.md index 563094c..66d0eac 100644 --- a/.claude/skills/close/SKILL.md +++ b/.claude/skills/close/SKILL.md @@ -270,9 +270,13 @@ grep "^Status:" .agent/backlog/.md **Move** backlog file to `archive/` (e.g. `backlog/CRA-*.md` → `archive/CRA-*.md`) **task-tracking.md**: - Remove the row from the Priority Queue table - - Add entry to "Done" section with date and brief summary + - Add the task's ID + ✅ to the **Baseline** summary line in the header - Remove this task's ID from "Depends On" column of any tasks that depended on it - Update test count in header if tests were added +**CHANGELOG.md** (source of truth for what changed): + - Add user-facing changes to the `[Unreleased]` section under the appropriate heading (Added/Changed/Fixed/Removed) + - Write from the user's perspective — what the feature does, not implementation details + - Skip purely internal changes (dev tooling, .agent/ updates, workflow tweaks) unless they affect the installed package ## 7. Smart Next-Step Suggestion (Final Step or Session Ending) diff --git a/CHANGELOG.md b/CHANGELOG.md index 381815f..17db8c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Individual ADR MCP resources (`adr://{adr_id}`) for progressive disclosure — agents fetch full ADR content on demand via `resource_uri` field ### Changed +- Internal module structure reorganized into three planes: `decision/` (workflows, gate, guidance) and `enforcement/` (adapters, validation, generation, config, detection, reporter) — no public API changes - README rewritten for user focus: problem statement, quick start, tool reference, FAQ - `ROADMAP.md` "Recent Additions" section replaced with link to this changelog - CI workflow consolidated from 13 to 8 checks: dedicated lint job (blocks tests), trimmed test matrix to `(ubuntu + macOS) × (3.11–3.13) + ubuntu-only 3.10` From f9bdf91aa76285d0725e1928d12970d30f5006eb Mon Sep 17 00:00:00 2001 From: kschlt Date: Wed, 25 Mar 2026 11:10:07 +0100 Subject: [PATCH 2/4] feat(enforcement): implement canonical enforcement pipeline (CPL) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enforcement rules must be derived from the compiled ConstraintsContract, not from raw ADR files directly. The old _apply_guardrails (stub) and _generate_enforcement_rules (called adapters on raw ADRs) in approval.py were replaced by a single EnforcementPipeline.compile() call. Key additions: - enforcement/pipeline.py: EnforcementResult envelope (AppliedFragment, EnforcementConflict, SkippedAdapter, ProvenanceEntry) with idempotency hash (SHA-256 of sorted outputs — same contract compiled twice yields identical hash) - generate_eslint_config_from_contract() and generate_ruff_config_from_contract() added to both adapters as the canonical contract-driven path - clause_id added to PolicyProvenance (SHA-256(adr_id:rule_path)[:12]) for traceability; populated via _make_provenance() in all merge methods - _topological_sort() replaced date-sort TODO with Kahn's algorithm using supersedes field for edges, date-sort tie-breaking, cycle-detection fallback - 27 new unit tests covering all of the above; 313 tests pass --- adr_kit/contract/merger.py | 138 ++++--- adr_kit/contract/models.py | 9 + adr_kit/decision/workflows/approval.py | 302 +++----------- adr_kit/enforcement/adapters/eslint.py | 56 +++ adr_kit/enforcement/adapters/ruff.py | 40 ++ adr_kit/enforcement/pipeline.py | 335 ++++++++++++++++ tests/unit/test_enforcement_pipeline.py | 505 ++++++++++++++++++++++++ 7 files changed, 1088 insertions(+), 297 deletions(-) create mode 100644 adr_kit/enforcement/pipeline.py create mode 100644 tests/unit/test_enforcement_pipeline.py diff --git a/adr_kit/contract/merger.py b/adr_kit/contract/merger.py index 74e929c..f5bafdf 100644 --- a/adr_kit/contract/merger.py +++ b/adr_kit/contract/merger.py @@ -20,6 +20,22 @@ from .models import MergedConstraints, PolicyProvenance +def _make_provenance( + adr_id: str, + adr_title: str, + rule_path: str, + effective_date: "datetime", +) -> PolicyProvenance: + """Create a PolicyProvenance with a deterministic clause_id.""" + return PolicyProvenance( + adr_id=adr_id, + adr_title=adr_title, + rule_path=rule_path, + effective_date=effective_date, + clause_id=PolicyProvenance.make_clause_id(adr_id, rule_path), + ) + + @dataclass class PolicyConflict: """Represents a conflict between two ADR policies.""" @@ -174,10 +190,57 @@ def _topological_sort(self, adrs: list[ADR]) -> list[ADR]: """Sort ADRs topologically based on supersede relationships. ADRs that supersede others come later in the list, so they can override. + Uses Kahn's algorithm. Falls back to date sort for ADRs with no + supersession relationships. """ - # For now, simple sort by date (older first) - # TODO: Implement proper topological sort based on supersedes relationships - return sorted(adrs, key=lambda adr: adr.front_matter.date) + if not adrs: + return adrs + + # Build index by ID for fast lookup + by_id = {adr.front_matter.id: adr for adr in adrs} + + # Build adjacency: predecessor → set of successors + # If B supersedes A, then A must come before B (A → B edge) + successors: dict[str, set[str]] = {adr.front_matter.id: set() for adr in adrs} + in_degree: dict[str, int] = {adr.front_matter.id: 0 for adr in adrs} + + for adr in adrs: + for superseded_id in adr.front_matter.supersedes or []: + if superseded_id in by_id: + # adr supersedes superseded_id → superseded_id must come first + successors[superseded_id].add(adr.front_matter.id) + in_degree[adr.front_matter.id] += 1 + + # Kahn's algorithm — start with nodes that have no predecessors + # Tie-break with date sort for determinism + queue = sorted( + [adr for adr in adrs if in_degree[adr.front_matter.id] == 0], + key=lambda a: a.front_matter.date, + ) + result: list[ADR] = [] + + while queue: + node = queue.pop(0) + result.append(node) + for succ_id in sorted(successors[node.front_matter.id]): + in_degree[succ_id] -= 1 + if in_degree[succ_id] == 0: + succ_adr = by_id[succ_id] + # Insert in date order among ready nodes + inserted = False + for i, q in enumerate(queue): + if succ_adr.front_matter.date < q.front_matter.date: + queue.insert(i, succ_adr) + inserted = True + break + if not inserted: + queue.append(succ_adr) + + # If cycle detected (result shorter than input), fall back to date sort + if len(result) < len(adrs): + return sorted(adrs, key=lambda adr: adr.front_matter.date) + + return result def _merge_import_policy( self, @@ -215,11 +278,8 @@ def _merge_import_policy( merged_prefer.discard(item) merged_disallow.add(item) - provenance[f"imports.disallow.{item}"] = PolicyProvenance( - adr_id=adr_id, - adr_title=adr_title, - rule_path=f"imports.disallow.{item}", - effective_date=effective_date, + provenance[f"imports.disallow.{item}"] = _make_provenance( + adr_id, adr_title, f"imports.disallow.{item}", effective_date ) # Add new prefer items @@ -243,11 +303,8 @@ def _merge_import_policy( continue merged_prefer.add(item) - provenance[f"imports.prefer.{item}"] = PolicyProvenance( - adr_id=adr_id, - adr_title=adr_title, - rule_path=f"imports.prefer.{item}", - effective_date=effective_date, + provenance[f"imports.prefer.{item}"] = _make_provenance( + adr_id, adr_title, f"imports.prefer.{item}", effective_date ) return ( @@ -275,11 +332,8 @@ def _merge_python_policy( merged_disallow.update(new.disallow_imports) for item in new.disallow_imports: - provenance[f"python.disallow_imports.{item}"] = PolicyProvenance( - adr_id=adr_id, - adr_title=adr_title, - rule_path=f"python.disallow_imports.{item}", - effective_date=effective_date, + provenance[f"python.disallow_imports.{item}"] = _make_provenance( + adr_id, adr_title, f"python.disallow_imports.{item}", effective_date ) return ( @@ -307,11 +361,8 @@ def _merge_pattern_policy( if new.patterns: for rule_name, rule in new.patterns.items(): merged_patterns[rule_name] = rule - provenance[f"patterns.{rule_name}"] = PolicyProvenance( - adr_id=adr_id, - adr_title=adr_title, - rule_path=f"patterns.{rule_name}", - effective_date=effective_date, + provenance[f"patterns.{rule_name}"] = _make_provenance( + adr_id, adr_title, f"patterns.{rule_name}", effective_date ) return merged_patterns, provenance @@ -333,11 +384,11 @@ def _merge_architecture_policy( for boundary in new.layer_boundaries: merged_boundaries.append(boundary) provenance[f"architecture.boundaries.{boundary.rule}"] = ( - PolicyProvenance( - adr_id=adr_id, - adr_title=adr_title, - rule_path=f"architecture.boundaries.{boundary.rule}", - effective_date=effective_date, + _make_provenance( + adr_id, + adr_title, + f"architecture.boundaries.{boundary.rule}", + effective_date, ) ) @@ -347,11 +398,11 @@ def _merge_architecture_policy( for structure in new.required_structure: merged_structures.append(structure) provenance[f"architecture.structure.{structure.path}"] = ( - PolicyProvenance( - adr_id=adr_id, - adr_title=adr_title, - rule_path=f"architecture.structure.{structure.path}", - effective_date=effective_date, + _make_provenance( + adr_id, + adr_title, + f"architecture.structure.{structure.path}", + effective_date, ) ) @@ -382,11 +433,8 @@ def _merge_config_policy( merged_ts_config.update(existing.typescript.tsconfig) if new.typescript and new.typescript.tsconfig: merged_ts_config.update(new.typescript.tsconfig) - provenance["config.typescript"] = PolicyProvenance( - adr_id=adr_id, - adr_title=adr_title, - rule_path="config.typescript", - effective_date=effective_date, + provenance["config.typescript"] = _make_provenance( + adr_id, adr_title, "config.typescript", effective_date ) # Merge Python config @@ -402,19 +450,13 @@ def _merge_config_policy( if new.python: if new.python.ruff: merged_py_ruff.update(new.python.ruff) - provenance["config.python.ruff"] = PolicyProvenance( - adr_id=adr_id, - adr_title=adr_title, - rule_path="config.python.ruff", - effective_date=effective_date, + provenance["config.python.ruff"] = _make_provenance( + adr_id, adr_title, "config.python.ruff", effective_date ) if new.python.mypy: merged_py_mypy.update(new.python.mypy) - provenance["config.python.mypy"] = PolicyProvenance( - adr_id=adr_id, - adr_title=adr_title, - rule_path="config.python.mypy", - effective_date=effective_date, + provenance["config.python.mypy"] = _make_provenance( + adr_id, adr_title, "config.python.mypy", effective_date ) # Create merged config models diff --git a/adr_kit/contract/models.py b/adr_kit/contract/models.py index 401e279..2ec3ec4 100644 --- a/adr_kit/contract/models.py +++ b/adr_kit/contract/models.py @@ -32,6 +32,15 @@ class PolicyProvenance(BaseModel): ..., description="Path to the specific rule (e.g., 'imports.disallow.axios')" ) effective_date: datetime = Field(..., description="When this rule became active") + clause_id: str = Field( + "", + description="Deterministic 12-char identifier: sha256(adr_id:rule_path)[:12]", + ) + + @classmethod + def make_clause_id(cls, adr_id: str, rule_path: str) -> str: + """Generate a deterministic clause ID from adr_id and rule_path.""" + return hashlib.sha256(f"{adr_id}:{rule_path}".encode()).hexdigest()[:12] class ContractMetadata(BaseModel): diff --git a/adr_kit/decision/workflows/approval.py b/adr_kit/decision/workflows/approval.py index 63d7754..a61af92 100644 --- a/adr_kit/decision/workflows/approval.py +++ b/adr_kit/decision/workflows/approval.py @@ -11,9 +11,7 @@ from ...core.model import ADR from ...core.parse import find_adr_files, parse_adr_file from ...core.validate import validate_adr -from ...enforcement.adapters.eslint import generate_eslint_config -from ...enforcement.adapters.ruff import generate_ruff_config -from ...enforcement.config.manager import GuardrailManager +from ...enforcement.pipeline import EnforcementPipeline, EnforcementResult from ...index.json_index import generate_adr_index from .base import BaseWorkflow, WorkflowResult @@ -41,6 +39,7 @@ class ApprovalResult: configurations_updated: list[str] # List of config files updated warnings: list[str] # Non-blocking warnings next_steps: str # Guidance for what happens next + enforcement_result: EnforcementResult | None = None # Canonical pipeline output class ApprovalWorkflow(BaseWorkflow): @@ -106,20 +105,15 @@ def execute(self, **kwargs: Any) -> WorkflowResult: ) # Step 4: Rebuild constraints contract - contract_result = self._execute_step( + contract, contract_result = self._execute_step( "rebuild_constraints_contract", self._rebuild_constraints_contract ) - # Step 5: Apply guardrails - guardrail_result = self._execute_step( - "apply_guardrails", self._apply_guardrails, updated_adr - ) - - # Step 6: Generate enforcement rules - enforcement_result = self._execute_step( - "generate_enforcement_rules", - self._generate_enforcement_rules, - updated_adr, + # Step 5: Run canonical enforcement pipeline (replaces guardrails + rule generation) + enforcement_result, enforcement_summary = self._execute_step( + "enforcement_pipeline", + self._run_enforcement_pipeline, + contract, ) # Step 7: Update indexes @@ -140,8 +134,7 @@ def execute(self, **kwargs: Any) -> WorkflowResult: "new_status": "accepted", }, "contract_rebuild": contract_result, - "guardrail_application": guardrail_result, - "enforcement_generation": enforcement_result, + "enforcement_pipeline": enforcement_summary, "index_update": index_result, "codebase_validation": validation_result, } @@ -169,6 +162,7 @@ def execute(self, **kwargs: Any) -> WorkflowResult: ), warnings=approval_report.get("warnings", []), next_steps=approval_report.get("next_steps", ""), + enforcement_result=enforcement_result, ) self._complete_workflow( @@ -296,240 +290,67 @@ def _update_adr_status( ) return updated_adr - def _rebuild_constraints_contract(self) -> dict[str, Any]: - """Rebuild the constraints contract with all approved ADRs.""" + def _rebuild_constraints_contract( + self, + ) -> tuple[Any, dict[str, Any]]: + """Rebuild the constraints contract with all approved ADRs. + + Returns the contract object alongside a summary dict for reporting. + """ try: builder = ConstraintsContractBuilder(adr_dir=self.adr_dir) contract = builder.build() - return { + summary = { "success": True, "approved_adrs": len(contract.approved_adrs), "constraints_exist": not contract.constraints.is_empty(), - "constraints": 1 if not contract.constraints.is_empty() else 0, "message": "Constraints contract rebuilt successfully", } + return contract, summary except Exception as e: - return { + summary = { "success": False, "error": str(e), "message": "Failed to rebuild constraints contract", } + return None, summary - def _apply_guardrails(self, adr: ADR) -> dict[str, Any]: - """Apply guardrails based on the approved ADR.""" - try: - # Apply guardrails using GuardrailManager - GuardrailManager(adr_dir=Path(self.adr_dir)) - - # This is a simplified implementation - would need to be enhanced - # to fully integrate with the GuardrailManager's apply methods - - return { - "success": True, - "guardrails_applied": 0, # Simplified for now - "configurations_updated": [], - "message": "Guardrails system initialized (simplified implementation)", - } - - except Exception as e: - return { - "success": False, - "error": str(e), - "message": "Failed to apply guardrails", - } - - def _generate_enforcement_rules(self, adr: ADR) -> dict[str, Any]: - """Generate enforcement rules (ESLint, Ruff, git hooks) from ADR policies.""" - results = {} + def _run_enforcement_pipeline( + self, contract: Any + ) -> tuple["EnforcementResult | None", dict[str, Any]]: + """Run the canonical enforcement pipeline against the compiled contract. + Returns the EnforcementResult envelope alongside a summary dict. + """ try: - # Generate ESLint rules if JavaScript/TypeScript policies exist - if self._has_javascript_policies(adr): - eslint_result = self._generate_eslint_rules(adr) - results["eslint"] = eslint_result - - # Generate Ruff rules if Python policies exist - if self._has_python_policies(adr): - ruff_result = self._generate_ruff_rules(adr) - results["ruff"] = ruff_result - - # Generate standalone validation scripts - scripts_result = self._generate_validation_scripts(adr) - results["scripts"] = scripts_result - - # Always update git hooks so staged enforcement reflects new rules - hooks_result = self._update_git_hooks() - results["hooks"] = hooks_result + pipeline = EnforcementPipeline( + adr_dir=Path(self.adr_dir), project_path=Path.cwd() + ) + result = pipeline.compile( + contract=contract if contract is not None else None + ) - return { + summary: dict[str, Any] = { "success": True, - "rule_generators": list(results.keys()), - "details": results, - "message": "Enforcement rules generated successfully", + "fragments_applied": len(result.fragments_applied), + "files_touched": result.files_touched, + "conflicts": len(result.conflicts), + "skipped_adapters": [s.adapter for s in result.skipped_adapters], + "idempotency_hash": result.idempotency_hash, + "provenance_entries": len(result.provenance), + "message": "Enforcement pipeline completed", } + return result, summary except Exception as e: - return { + summary = { "success": False, "error": str(e), - "message": "Failed to generate enforcement rules", + "message": "Enforcement pipeline failed", } - - def _generate_validation_scripts(self, adr: ADR) -> dict[str, Any]: - """Generate standalone validation scripts for an ADR's policies.""" - try: - from ...enforcement.generation.scripts import ScriptGenerator - - generator = ScriptGenerator(adr_dir=self.adr_dir) - output_dir = Path.cwd() / "scripts" / "adr" - path = generator.generate_for_adr(adr, output_dir) - - if path: - return { - "success": True, - "script": str(path), - "message": f"Validation script generated: {path.name}", - } - return { - "success": True, - "script": None, - "message": "No enforceable policies — no script generated", - } - except Exception as e: - return { - "success": False, - "error": str(e), - "message": "Failed to generate validation script", - } - - def _update_git_hooks(self) -> dict[str, Any]: - """Update git hooks to run staged enforcement checks.""" - try: - from ...enforcement.generation.hooks import HookGenerator - - generator = HookGenerator() - hook_results = generator.generate(project_root=Path.cwd()) - - updated = [ - name - for name, action in hook_results.items() - if action not in ("unchanged", "skipped") - ] - skipped = [ - name for name, action in hook_results.items() if "skipped" in action - ] - - return { - "success": True, - "hooks_updated": updated, - "hooks_skipped": skipped, - "details": hook_results, - "message": f"Git hooks updated: {', '.join(updated) if updated else 'all unchanged'}", - } - except Exception as e: - return { - "success": False, - "error": str(e), - "message": "Failed to update git hooks (non-blocking)", - } - - def _has_javascript_policies(self, adr: ADR) -> bool: - """Check if ADR has JavaScript/TypeScript related policies.""" - if not adr.policy: - return False - - # Check for import restrictions, frontend policies, etc. - js_indicators = [] - - # Check if it has imports policy - if adr.policy.imports: - js_indicators.append(True) - - # Check for frontend-related terms in policy - policy_text = str(adr.policy.model_dump()).lower() - js_indicators.extend( - [ - "javascript" in policy_text, - "typescript" in policy_text, - "frontend" in policy_text, - "react" in policy_text, - "vue" in policy_text, - ] - ) - - return any(js_indicators) - - def _has_python_policies(self, adr: ADR) -> bool: - """Check if ADR has Python related policies.""" - if not adr.policy: - return False - - # Check for Python-specific policies - python_indicators = [] - - # Check for python-specific policy - if adr.policy.python: - python_indicators.append(True) - - # Check for imports policy - if adr.policy.imports: - python_indicators.append(True) - - # Check for Python-related terms in policy - policy_text = str(adr.policy.model_dump()).lower() - python_indicators.extend( - [ - "django" in policy_text, - "flask" in policy_text, - ] - ) - - return any(python_indicators) - - def _generate_eslint_rules(self, adr: ADR) -> dict[str, Any]: - """Generate ESLint rules from ADR policies.""" - try: - config = generate_eslint_config(self.adr_dir) - - # Write to .eslintrc.adrs.json - output_file = Path.cwd() / ".eslintrc.adrs.json" - with open(output_file, "w") as f: - f.write(config) - rules: list[dict[str, Any]] = [] # Simplified for now - - return { - "success": True, - "rules_generated": len(rules), - "output_file": str(output_file), - "rules": rules, - } - - except Exception as e: - return {"success": False, "error": str(e)} - - def _generate_ruff_rules(self, adr: ADR) -> dict[str, Any]: - """Generate Ruff configuration from ADR policies.""" - try: - config_content = generate_ruff_config(self.adr_dir) - - # Update pyproject.toml - output_file = Path.cwd() / "pyproject.toml" - # For now, just create a simple config file - with open(output_file, "a") as f: - f.write("\n" + config_content) - config: dict[str, Any] = {} # Simplified for now - - return { - "success": True, - "config_sections": len(config), - "output_file": str(output_file), - "config": config, - } - - except Exception as e: - return {"success": False, "error": str(e)} + return None, summary def _update_indexes(self) -> dict[str, Any]: """Update JSON and other indexes after ADR approval.""" @@ -635,17 +456,10 @@ def _quick_scan_for_violations( def _count_policy_rules_applied(self, automation_results: dict[str, Any]) -> int: """Count total policy rules applied across all systems.""" - count = 0 - - if "enforcement_generation" in automation_results: - enforcement = automation_results["enforcement_generation"] - if enforcement.get("success"): - details = enforcement.get("details", {}) - for _system, result in details.items(): - if result.get("success"): - count += result.get("rules_generated", 0) - - return count + pipeline = automation_results.get("enforcement_pipeline", {}) + if pipeline.get("success"): + return pipeline.get("fragments_applied", 0) + return 0 def _extract_updated_configurations( self, automation_results: dict[str, Any] @@ -653,20 +467,10 @@ def _extract_updated_configurations( """Extract list of configuration files that were updated.""" updated_files = [] - # From guardrail application - if "guardrail_application" in automation_results: - guardrails = automation_results["guardrail_application"] - if guardrails.get("success"): - updated_files.extend(guardrails.get("configurations_updated", [])) - - # From enforcement rule generation - if "enforcement_generation" in automation_results: - enforcement = automation_results["enforcement_generation"] - if enforcement.get("success"): - details = enforcement.get("details", {}) - for _system, result in details.items(): - if result.get("success") and result.get("output_file"): - updated_files.append(result["output_file"]) + # From enforcement pipeline + pipeline = automation_results.get("enforcement_pipeline", {}) + if pipeline.get("success"): + updated_files.extend(pipeline.get("files_touched", [])) # From index updates if "index_update" in automation_results: diff --git a/adr_kit/enforcement/adapters/eslint.py b/adr_kit/enforcement/adapters/eslint.py index 3673add..fdcd3f7 100644 --- a/adr_kit/enforcement/adapters/eslint.py +++ b/adr_kit/enforcement/adapters/eslint.py @@ -348,6 +348,62 @@ def generate_eslint_config(adr_directory: Path | str = "docs/adr") -> str: return json.dumps(config, indent=2) +def generate_eslint_config_from_contract(constraints: Any) -> ESLintConfig: + """Generate ESLint configuration from a compiled MergedConstraints object. + + This is the canonical contract-driven path. It reads only from the + compiled contract and never touches raw ADR files. + + Args: + constraints: MergedConstraints from the ConstraintsContract + + Returns: + ESLint configuration dictionary + """ + from datetime import datetime + + config: ESLintConfig = { + "rules": {}, + "settings": {}, + "env": {}, + "extends": [], + "__adr_metadata": { + "generated_by": "ADR Kit", + "source_adrs": [], + "generation_timestamp": datetime.now().isoformat(), + "preferred_libraries": None, + }, + } + + banned_imports: list[dict[str, str]] = [] + preferred_mappings: dict[str, str] = {} + + if constraints.imports: + if constraints.imports.disallow: + for lib in constraints.imports.disallow: + banned_imports.append( + { + "name": lib, + "message": f"Import of '{lib}' is not allowed per architecture contract", + } + ) + + if constraints.imports.prefer: + for lib in constraints.imports.prefer: + preferred_mappings[lib] = "preferred per architecture contract" + + if banned_imports: + config["rules"]["no-restricted-imports"] = [ + "error", + {"paths": banned_imports}, + ] + + if preferred_mappings: + config["__adr_metadata"]["preferred_libraries"] = preferred_mappings + + return config + + def generate_eslint_overrides( adr_directory: Path | str = "docs/adr", ) -> dict[str, Any]: diff --git a/adr_kit/enforcement/adapters/ruff.py b/adr_kit/enforcement/adapters/ruff.py index 83dfdfe..0f0879f 100644 --- a/adr_kit/enforcement/adapters/ruff.py +++ b/adr_kit/enforcement/adapters/ruff.py @@ -245,6 +245,46 @@ def generate_ruff_config(adr_directory: Path | str = "docs/adr") -> str: return toml.dumps(ruff_config) +def generate_ruff_config_from_contract(constraints: Any) -> str: + """Generate Ruff configuration from a compiled MergedConstraints object. + + This is the canonical contract-driven path. Reads only from the compiled + contract — never from raw ADR files. + + Args: + constraints: MergedConstraints from the ConstraintsContract + + Returns: + TOML string with Ruff configuration + """ + banned_imports: set[str] = set() + + # Python-specific disallow list + if constraints.python and constraints.python.disallow_imports: + banned_imports.update(constraints.python.disallow_imports) + + # Generic import disallow (applies to Python too) + if constraints.imports and constraints.imports.disallow: + banned_imports.update(constraints.imports.disallow) + + ruff_config: dict[str, Any] = { + "target-version": "py310", + "line-length": 88, + "select": ["E", "W", "F"], + "extend-ignore": [], + } + + if banned_imports: + # Ruff's flake8-import-conventions banned-from list + ruff_config["lint"] = { + "flake8-import-conventions": { + "banned-from": sorted(banned_imports), + } + } + + return toml.dumps({"tool": {"ruff": ruff_config}}) + + def generate_import_linter_config(adr_directory: Path | str = "docs/adr") -> str: """Generate import-linter configuration from ADRs. diff --git a/adr_kit/enforcement/pipeline.py b/adr_kit/enforcement/pipeline.py new file mode 100644 index 0000000..05154ce --- /dev/null +++ b/adr_kit/enforcement/pipeline.py @@ -0,0 +1,335 @@ +"""Canonical Enforcement Pipeline. + +Single entry point for all enforcement compilation. Reads from the compiled +ConstraintsContract and never from raw ADRs. Produces a stable EnforcementResult +audit artifact on every run. + +Pipeline stages: + 1. Read MergedConstraints from the contract + 2. Generate native fragments (ESLint, Ruff) + 3. Generate secondary artifacts (validation scripts, git hooks, CI workflow) + 4. Return EnforcementResult envelope + +Conflict detection (CFD task) will slot between stages 2 and 3 once implemented. +Router/adapter selection (RTR task) will replace the hard-coded adapter calls. +""" + +import hashlib +import json +from pathlib import Path + +from pydantic import BaseModel, Field + +from ..contract.builder import ConstraintsContractBuilder +from ..contract.models import ConstraintsContract + + +class AppliedFragment(BaseModel): + """A config fragment that was successfully written to disk.""" + + adapter: str = Field(..., description="Adapter name, e.g. 'eslint', 'ruff'") + target_file: str = Field(..., description="Path of the file that was written") + policy_keys: list[str] = Field( + default_factory=list, + description="Contract policy keys that this fragment covers", + ) + fragment_type: str = Field( + ..., description="Fragment format, e.g. 'json_file', 'toml_section'" + ) + + +class EnforcementConflict(BaseModel): + """A contradiction detected between fragments or with existing user config.""" + + adapter: str = Field(..., description="Adapter that detected the conflict") + description: str = Field(..., description="Human-readable conflict description") + source_adrs: list[str] = Field( + default_factory=list, description="ADR IDs whose policies conflict" + ) + + +class SkippedAdapter(BaseModel): + """An adapter that did not run.""" + + adapter: str = Field(..., description="Adapter name") + reason: str = Field( + ..., description="Why it was skipped, e.g. 'no matching policy keys'" + ) + + +class ProvenanceEntry(BaseModel): + """Maps one contract rule back to its source ADR and clause.""" + + rule: str = Field(..., description="Policy key, e.g. 'imports.disallow.axios'") + source_adr_id: str = Field(..., description="ADR that defined this rule") + clause_id: str = Field( + ..., description="Deterministic 12-char clause identifier from contract" + ) + artifact_refs: list[str] = Field( + default_factory=list, + description="Files/fragments generated from this rule (populated by adapters)", + ) + + +class EnforcementResult(BaseModel): + """Stable audit artifact produced by every enforcement compilation run.""" + + fragments_applied: list[AppliedFragment] = Field(default_factory=list) + files_touched: list[str] = Field(default_factory=list) + conflicts: list[EnforcementConflict] = Field(default_factory=list) + skipped_adapters: list[SkippedAdapter] = Field(default_factory=list) + fallback_promptlets: list[str] = Field(default_factory=list) + provenance: list[ProvenanceEntry] = Field(default_factory=list) + idempotency_hash: str = Field( + "", description="SHA-256 of all outputs — identical on re-run with same inputs" + ) + + def compute_idempotency_hash(self) -> str: + """Compute and store the idempotency hash from current outputs.""" + payload = { + "fragments": sorted( + [f.model_dump() for f in self.fragments_applied], + key=lambda x: (x["adapter"], x["target_file"]), + ), + "files_touched": sorted(self.files_touched), + "conflicts": sorted( + [c.model_dump() for c in self.conflicts], key=lambda x: x["adapter"] + ), + "skipped_adapters": sorted( + [s.model_dump() for s in self.skipped_adapters], + key=lambda x: x["adapter"], + ), + "fallback_promptlets": sorted(self.fallback_promptlets), + "provenance": sorted( + [p.model_dump() for p in self.provenance], key=lambda x: x["rule"] + ), + } + hash_str = hashlib.sha256( + json.dumps(payload, sort_keys=True, default=str).encode() + ).hexdigest() + self.idempotency_hash = hash_str + return hash_str + + +class EnforcementPipeline: + """Compiles a ConstraintsContract into enforcement artifacts. + + This is the single entry point for all enforcement. Callers pass the + already-built contract; the pipeline reads constraints from it and never + touches raw ADR files. + """ + + def __init__(self, adr_dir: Path, project_path: Path | None = None) -> None: + self.adr_dir = adr_dir + self.project_path = project_path or Path.cwd() + + def compile(self, contract: ConstraintsContract | None = None) -> EnforcementResult: + """Run the full enforcement pipeline and return a result envelope. + + Args: + contract: Pre-built contract. If None, builds from adr_dir. + + Returns: + EnforcementResult with fragments applied, conflicts, provenance, and hash. + """ + if contract is None: + builder = ConstraintsContractBuilder(adr_dir=self.adr_dir) + contract = builder.build() + + constraints = contract.constraints + + result = EnforcementResult() + + # Build provenance index from contract + provenance_index = self._build_provenance_index(contract) + result.provenance = list(provenance_index.values()) + + # Stage 1: Generate native fragments + self._run_eslint_adapter(constraints, result) + self._run_ruff_adapter(constraints, result) + + # Stage 2: Generate secondary artifacts + self._run_script_generator(result) + self._run_hook_generator(result) + + # Deduplicate files_touched + result.files_touched = sorted(set(result.files_touched)) + + # Compute idempotency hash + result.compute_idempotency_hash() + + return result + + # ------------------------------------------------------------------ + # Internal adapter calls + # ------------------------------------------------------------------ + + def _run_eslint_adapter( + self, constraints: object, result: EnforcementResult + ) -> None: + """Run the ESLint adapter if JS/TS constraints are present.""" + from ..contract.models import MergedConstraints + + if not isinstance(constraints, MergedConstraints): + result.skipped_adapters.append( + SkippedAdapter(adapter="eslint", reason="invalid constraints object") + ) + return + + has_js_constraints = bool(constraints.imports) + if not has_js_constraints: + result.skipped_adapters.append( + SkippedAdapter( + adapter="eslint", + reason="no matching policy keys (no imports policy)", + ) + ) + return + + try: + import json as _json + + from .adapters.eslint import generate_eslint_config_from_contract + + config = generate_eslint_config_from_contract(constraints) + output_file = self.project_path / ".eslintrc.adrs.json" + output_file.write_text(_json.dumps(config, indent=2)) + + policy_keys = [] + if constraints.imports and constraints.imports.disallow: + policy_keys.extend( + [f"imports.disallow.{x}" for x in constraints.imports.disallow] + ) + if constraints.imports and constraints.imports.prefer: + policy_keys.extend( + [f"imports.prefer.{x}" for x in constraints.imports.prefer] + ) + + result.fragments_applied.append( + AppliedFragment( + adapter="eslint", + target_file=str(output_file), + policy_keys=policy_keys, + fragment_type="json_file", + ) + ) + result.files_touched.append(str(output_file)) + + except Exception as e: + result.skipped_adapters.append( + SkippedAdapter(adapter="eslint", reason=f"adapter error: {e}") + ) + + def _run_ruff_adapter(self, constraints: object, result: EnforcementResult) -> None: + """Run the Ruff adapter if Python constraints are present.""" + from ..contract.models import MergedConstraints + + if not isinstance(constraints, MergedConstraints): + result.skipped_adapters.append( + SkippedAdapter(adapter="ruff", reason="invalid constraints object") + ) + return + + has_python_constraints = bool(constraints.python or constraints.imports) + if not has_python_constraints: + result.skipped_adapters.append( + SkippedAdapter( + adapter="ruff", + reason="no matching policy keys (no python or imports policy)", + ) + ) + return + + try: + from .adapters.ruff import generate_ruff_config_from_contract + + config_toml = generate_ruff_config_from_contract(constraints) + output_file = self.project_path / ".ruff-adr.toml" + output_file.write_text(config_toml) + + policy_keys = [] + if constraints.python and constraints.python.disallow_imports: + policy_keys.extend( + [ + f"python.disallow_imports.{x}" + for x in constraints.python.disallow_imports + ] + ) + if constraints.imports and constraints.imports.disallow: + policy_keys.extend( + [f"imports.disallow.{x}" for x in constraints.imports.disallow] + ) + + result.fragments_applied.append( + AppliedFragment( + adapter="ruff", + target_file=str(output_file), + policy_keys=policy_keys, + fragment_type="toml_file", + ) + ) + result.files_touched.append(str(output_file)) + + except Exception as e: + result.skipped_adapters.append( + SkippedAdapter(adapter="ruff", reason=f"adapter error: {e}") + ) + + def _run_script_generator(self, result: EnforcementResult) -> None: + """Generate per-ADR validation scripts.""" + try: + from ..core.model import ADRStatus + from ..core.parse import find_adr_files, parse_adr_file + from .generation.scripts import ScriptGenerator + + generator = ScriptGenerator(adr_dir=self.adr_dir) + output_dir = self.project_path / "scripts" / "adr" + + adr_files = find_adr_files(self.adr_dir) + for file_path in adr_files: + try: + adr = parse_adr_file(file_path, strict=False) + if adr and adr.front_matter.status == ADRStatus.ACCEPTED: + path = generator.generate_for_adr(adr, output_dir) + if path: + result.files_touched.append(str(path)) + except Exception: + continue + + except Exception as e: + result.skipped_adapters.append( + SkippedAdapter(adapter="script_generator", reason=f"error: {e}") + ) + + def _run_hook_generator(self, result: EnforcementResult) -> None: + """Update git hooks for staged enforcement.""" + try: + from .generation.hooks import HookGenerator + + generator = HookGenerator() + hook_results = generator.generate(project_root=self.project_path) + + for name, action in hook_results.items(): + if action not in ("unchanged", "skipped"): + result.files_touched.append( + str(self.project_path / ".git" / "hooks" / name) + ) + + except Exception as e: + result.skipped_adapters.append( + SkippedAdapter(adapter="hook_generator", reason=f"error: {e}") + ) + + def _build_provenance_index( + self, contract: ConstraintsContract + ) -> dict[str, ProvenanceEntry]: + """Convert contract provenance into ProvenanceEntry objects.""" + index: dict[str, ProvenanceEntry] = {} + for rule_path, prov in contract.provenance.items(): + index[rule_path] = ProvenanceEntry( + rule=rule_path, + source_adr_id=prov.adr_id, + clause_id=prov.clause_id, + artifact_refs=[], + ) + return index diff --git a/tests/unit/test_enforcement_pipeline.py b/tests/unit/test_enforcement_pipeline.py new file mode 100644 index 0000000..ac599ec --- /dev/null +++ b/tests/unit/test_enforcement_pipeline.py @@ -0,0 +1,505 @@ +"""Unit tests for the canonical enforcement pipeline (CPL task). + +Covers: +- EnforcementResult model and idempotency hash +- ESLint/Ruff contract-driven adapter functions +- PolicyProvenance clause_id generation +- Topological sort in PolicyMerger +""" + +import hashlib +from datetime import date, datetime, timezone + +import pytest + +from adr_kit.contract.models import MergedConstraints, PolicyProvenance +from adr_kit.enforcement.pipeline import ( + AppliedFragment, + EnforcementConflict, + EnforcementResult, + ProvenanceEntry, + SkippedAdapter, +) + +# --------------------------------------------------------------------------- +# PolicyProvenance clause_id tests +# --------------------------------------------------------------------------- + + +class TestClauseIdGeneration: + def test_clause_id_is_12_chars(self): + cid = PolicyProvenance.make_clause_id("ADR-0001", "imports.disallow.axios") + assert len(cid) == 12 + + def test_clause_id_is_stable(self): + """Same inputs always produce the same clause_id.""" + cid1 = PolicyProvenance.make_clause_id("ADR-0001", "imports.disallow.axios") + cid2 = PolicyProvenance.make_clause_id("ADR-0001", "imports.disallow.axios") + assert cid1 == cid2 + + def test_clause_id_differs_by_adr(self): + cid1 = PolicyProvenance.make_clause_id("ADR-0001", "imports.disallow.axios") + cid2 = PolicyProvenance.make_clause_id("ADR-0002", "imports.disallow.axios") + assert cid1 != cid2 + + def test_clause_id_differs_by_rule_path(self): + cid1 = PolicyProvenance.make_clause_id("ADR-0001", "imports.disallow.axios") + cid2 = PolicyProvenance.make_clause_id("ADR-0001", "imports.disallow.moment") + assert cid1 != cid2 + + def test_provenance_has_clause_id_via_merger(self): + """Merger populates clause_id on every provenance entry.""" + from adr_kit.contract.merger import PolicyMerger + from adr_kit.core.model import ( + ADR, + ADRFrontMatter, + ADRStatus, + ImportPolicy, + PolicyModel, + ) + + front_matter = ADRFrontMatter( + id="ADR-0001", + title="Use React Query", + status=ADRStatus.ACCEPTED, + date=date(2024, 1, 1), + deciders=["team"], + tags=["frontend"], + policy=PolicyModel( + imports=ImportPolicy(disallow=["axios"], prefer=["react-query"]) + ), + ) + adr = ADR(front_matter=front_matter, content="## Decision\nUse it.") + + merger = PolicyMerger() + result = merger.merge_policies([adr]) + + for prov in result.provenance.values(): + assert len(prov.clause_id) == 12 + assert prov.clause_id != "" + + +# --------------------------------------------------------------------------- +# Topological sort tests +# --------------------------------------------------------------------------- + + +class TestTopologicalSort: + def _make_adr(self, id_: str, date_: date, supersedes: list[str] | None = None): + from adr_kit.core.model import ADR, ADRFrontMatter, ADRStatus + + return ADR( + front_matter=ADRFrontMatter( + id=id_, + title=f"ADR {id_}", + status=ADRStatus.ACCEPTED, + date=date_, + deciders=["team"], + supersedes=supersedes, + ), + content="", + ) + + def test_no_supersession_falls_back_to_date_sort(self): + from adr_kit.contract.merger import PolicyMerger + + merger = PolicyMerger() + adrs = [ + self._make_adr("ADR-0003", date(2024, 3, 1)), + self._make_adr("ADR-0001", date(2024, 1, 1)), + self._make_adr("ADR-0002", date(2024, 2, 1)), + ] + sorted_adrs = merger._topological_sort(adrs) + ids = [a.front_matter.id for a in sorted_adrs] + assert ids == ["ADR-0001", "ADR-0002", "ADR-0003"] + + def test_linear_supersession_chain(self): + """ADR-002 supersedes ADR-001 → ADR-001 must come first.""" + from adr_kit.contract.merger import PolicyMerger + + merger = PolicyMerger() + adrs = [ + self._make_adr("ADR-0002", date(2024, 2, 1), supersedes=["ADR-0001"]), + self._make_adr("ADR-0001", date(2024, 1, 1)), + ] + sorted_adrs = merger._topological_sort(adrs) + ids = [a.front_matter.id for a in sorted_adrs] + assert ids.index("ADR-0001") < ids.index("ADR-0002") + + def test_diamond_supersession(self): + """ADR-003 supersedes both ADR-001 and ADR-002 — both must come before ADR-003.""" + from adr_kit.contract.merger import PolicyMerger + + merger = PolicyMerger() + adrs = [ + self._make_adr( + "ADR-0003", date(2024, 3, 1), supersedes=["ADR-0001", "ADR-0002"] + ), # noqa + self._make_adr("ADR-0001", date(2024, 1, 1)), + self._make_adr("ADR-0002", date(2024, 2, 1)), + ] + sorted_adrs = merger._topological_sort(adrs) + ids = [a.front_matter.id for a in sorted_adrs] + assert ids.index("ADR-0001") < ids.index("ADR-0003") + assert ids.index("ADR-0002") < ids.index("ADR-0003") + + def test_supersession_not_in_list_is_ignored(self): + """supersedes references to ADRs not in the input list are safely ignored.""" + from adr_kit.contract.merger import PolicyMerger + + merger = PolicyMerger() + adrs = [ + self._make_adr("ADR-0002", date(2024, 2, 1), supersedes=["ADR-9999"]), + self._make_adr("ADR-0001", date(2024, 1, 1)), + ] + sorted_adrs = merger._topological_sort(adrs) + assert len(sorted_adrs) == 2 + + def test_empty_list(self): + from adr_kit.contract.merger import PolicyMerger + + merger = PolicyMerger() + assert merger._topological_sort([]) == [] + + +# --------------------------------------------------------------------------- +# EnforcementResult model tests +# --------------------------------------------------------------------------- + + +class TestEnforcementResult: + def test_empty_result_has_empty_hash(self): + result = EnforcementResult() + assert result.idempotency_hash == "" + + def test_compute_hash_populates_field(self): + result = EnforcementResult() + h = result.compute_idempotency_hash() + assert len(h) == 64 # SHA-256 hex + assert result.idempotency_hash == h + + def test_same_result_same_hash(self): + """Identical outputs always produce identical hash.""" + r1 = EnforcementResult( + fragments_applied=[ + AppliedFragment( + adapter="eslint", + target_file="/project/.eslintrc.adrs.json", + policy_keys=["imports.disallow.axios"], + fragment_type="json_file", + ) + ], + files_touched=["/project/.eslintrc.adrs.json"], + ) + r2 = EnforcementResult( + fragments_applied=[ + AppliedFragment( + adapter="eslint", + target_file="/project/.eslintrc.adrs.json", + policy_keys=["imports.disallow.axios"], + fragment_type="json_file", + ) + ], + files_touched=["/project/.eslintrc.adrs.json"], + ) + h1 = r1.compute_idempotency_hash() + h2 = r2.compute_idempotency_hash() + assert h1 == h2 + + def test_different_fragments_different_hash(self): + r1 = EnforcementResult( + fragments_applied=[ + AppliedFragment( + adapter="eslint", + target_file="/project/.eslintrc.adrs.json", + policy_keys=["imports.disallow.axios"], + fragment_type="json_file", + ) + ] + ) + r2 = EnforcementResult( + fragments_applied=[ + AppliedFragment( + adapter="ruff", + target_file="/project/.ruff-adr.toml", + policy_keys=["python.disallow_imports.requests"], + fragment_type="toml_file", + ) + ] + ) + assert r1.compute_idempotency_hash() != r2.compute_idempotency_hash() + + def test_skipped_adapters_included_in_hash(self): + r1 = EnforcementResult( + skipped_adapters=[SkippedAdapter(adapter="ruff", reason="no python policy")] + ) + r2 = EnforcementResult( + skipped_adapters=[ + SkippedAdapter(adapter="eslint", reason="no imports policy") + ] + ) + assert r1.compute_idempotency_hash() != r2.compute_idempotency_hash() + + +# --------------------------------------------------------------------------- +# ESLint contract-driven adapter +# --------------------------------------------------------------------------- + + +class TestESLintContractAdapter: + def _make_constraints(self, disallow=None, prefer=None): + from adr_kit.core.model import ImportPolicy + + return MergedConstraints(imports=ImportPolicy(disallow=disallow, prefer=prefer)) + + def test_generates_no_restricted_imports_rule(self): + from adr_kit.enforcement.adapters.eslint import ( + generate_eslint_config_from_contract, + ) + + constraints = self._make_constraints(disallow=["axios", "moment"]) + config = generate_eslint_config_from_contract(constraints) + + assert "no-restricted-imports" in config["rules"] + paths = config["rules"]["no-restricted-imports"][1]["paths"] + names = {p["name"] for p in paths} + assert "axios" in names + assert "moment" in names + + def test_prefer_goes_to_metadata(self): + from adr_kit.enforcement.adapters.eslint import ( + generate_eslint_config_from_contract, + ) + + constraints = self._make_constraints(prefer=["react-query"]) + config = generate_eslint_config_from_contract(constraints) + + assert config["__adr_metadata"]["preferred_libraries"] is not None + assert "react-query" in config["__adr_metadata"]["preferred_libraries"] + + def test_empty_constraints_produces_no_rules(self): + from adr_kit.enforcement.adapters.eslint import ( + generate_eslint_config_from_contract, + ) + + constraints = MergedConstraints() + config = generate_eslint_config_from_contract(constraints) + + assert "no-restricted-imports" not in config["rules"] + + def test_has_metadata_timestamp(self): + from adr_kit.enforcement.adapters.eslint import ( + generate_eslint_config_from_contract, + ) + + constraints = self._make_constraints(disallow=["axios"]) + config = generate_eslint_config_from_contract(constraints) + + assert config["__adr_metadata"]["generation_timestamp"] is not None + + +# --------------------------------------------------------------------------- +# Ruff contract-driven adapter +# --------------------------------------------------------------------------- + + +class TestRuffContractAdapter: + def test_python_disallow_goes_to_banned_from(self): + import toml + + from adr_kit.core.model import PythonPolicy + from adr_kit.enforcement.adapters.ruff import generate_ruff_config_from_contract + + constraints = MergedConstraints( + python=PythonPolicy(disallow_imports=["requests", "httpx"]) + ) + toml_str = generate_ruff_config_from_contract(constraints) + config = toml.loads(toml_str) + + banned = config["tool"]["ruff"]["lint"]["flake8-import-conventions"][ + "banned-from" + ] + assert "requests" in banned + assert "httpx" in banned + + def test_imports_disallow_also_goes_to_banned_from(self): + import toml + + from adr_kit.core.model import ImportPolicy + from adr_kit.enforcement.adapters.ruff import generate_ruff_config_from_contract + + constraints = MergedConstraints(imports=ImportPolicy(disallow=["moment"])) + toml_str = generate_ruff_config_from_contract(constraints) + config = toml.loads(toml_str) + + banned = config["tool"]["ruff"]["lint"]["flake8-import-conventions"][ + "banned-from" + ] + assert "moment" in banned + + def test_empty_constraints_no_lint_section(self): + import toml + + from adr_kit.enforcement.adapters.ruff import generate_ruff_config_from_contract + + constraints = MergedConstraints() + toml_str = generate_ruff_config_from_contract(constraints) + config = toml.loads(toml_str) + + ruff_cfg = config["tool"]["ruff"] + assert "lint" not in ruff_cfg + + +# --------------------------------------------------------------------------- +# EnforcementPipeline integration (no real files needed) +# --------------------------------------------------------------------------- + + +class TestEnforcementPipeline: + def test_pipeline_skips_adapters_when_no_constraints(self, tmp_path): + """When the contract has no constraints, both adapters are skipped.""" + from adr_kit.contract.models import ( + ConstraintsContract, + ContractMetadata, + ) + from adr_kit.enforcement.pipeline import EnforcementPipeline + + metadata = ContractMetadata( + hash="abc123", + source_adrs=[], + adr_directory=str(tmp_path), + ) + contract = ConstraintsContract( + metadata=metadata, + constraints=MergedConstraints(), + provenance={}, + approved_adrs=[], + ) + + pipeline = EnforcementPipeline(adr_dir=tmp_path, project_path=tmp_path) + result = pipeline.compile(contract=contract) + + skipped_names = {s.adapter for s in result.skipped_adapters} + assert "eslint" in skipped_names + assert "ruff" in skipped_names + assert len(result.fragments_applied) == 0 + + def test_pipeline_applies_eslint_when_imports_present(self, tmp_path): + """ESLint adapter runs and writes file when import constraints exist.""" + from adr_kit.contract.models import ( + ConstraintsContract, + ContractMetadata, + ) + from adr_kit.core.model import ImportPolicy + from adr_kit.enforcement.pipeline import EnforcementPipeline + + metadata = ContractMetadata( + hash="abc123", + source_adrs=["ADR-0001"], + adr_directory=str(tmp_path), + ) + contract = ConstraintsContract( + metadata=metadata, + constraints=MergedConstraints(imports=ImportPolicy(disallow=["axios"])), + provenance={}, + approved_adrs=[], + ) + + pipeline = EnforcementPipeline(adr_dir=tmp_path, project_path=tmp_path) + result = pipeline.compile(contract=contract) + + eslint_fragments = [ + f for f in result.fragments_applied if f.adapter == "eslint" + ] + assert len(eslint_fragments) == 1 + assert (tmp_path / ".eslintrc.adrs.json").exists() + + def test_pipeline_produces_idempotency_hash(self, tmp_path): + """Pipeline always produces a non-empty idempotency hash.""" + from adr_kit.contract.models import ( + ConstraintsContract, + ContractMetadata, + ) + from adr_kit.enforcement.pipeline import EnforcementPipeline + + metadata = ContractMetadata( + hash="abc123", + source_adrs=[], + adr_directory=str(tmp_path), + ) + contract = ConstraintsContract( + metadata=metadata, + constraints=MergedConstraints(), + provenance={}, + approved_adrs=[], + ) + + pipeline = EnforcementPipeline(adr_dir=tmp_path, project_path=tmp_path) + result = pipeline.compile(contract=contract) + + assert len(result.idempotency_hash) == 64 + + def test_pipeline_idempotent_on_same_contract(self, tmp_path): + """Same contract compiled twice → identical idempotency hash.""" + from adr_kit.contract.models import ( + ConstraintsContract, + ContractMetadata, + ) + from adr_kit.core.model import ImportPolicy + from adr_kit.enforcement.pipeline import EnforcementPipeline + + metadata = ContractMetadata( + hash="abc123", + source_adrs=["ADR-0001"], + adr_directory=str(tmp_path), + ) + contract = ConstraintsContract( + metadata=metadata, + constraints=MergedConstraints(imports=ImportPolicy(disallow=["axios"])), + provenance={}, + approved_adrs=[], + ) + + pipeline = EnforcementPipeline(adr_dir=tmp_path, project_path=tmp_path) + r1 = pipeline.compile(contract=contract) + r2 = pipeline.compile(contract=contract) + + assert r1.idempotency_hash == r2.idempotency_hash + + def test_pipeline_provenance_from_contract(self, tmp_path): + """Pipeline exposes contract provenance in result.""" + from adr_kit.contract.models import ( + ConstraintsContract, + ContractMetadata, + PolicyProvenance, + ) + from adr_kit.enforcement.pipeline import EnforcementPipeline + + rule_path = "imports.disallow.axios" + prov = PolicyProvenance( + adr_id="ADR-0001", + adr_title="Use React Query", + rule_path=rule_path, + effective_date=datetime(2024, 1, 1, tzinfo=timezone.utc), + clause_id=PolicyProvenance.make_clause_id("ADR-0001", rule_path), + ) + metadata = ContractMetadata( + hash="abc123", + source_adrs=["ADR-0001"], + adr_directory=str(tmp_path), + ) + contract = ConstraintsContract( + metadata=metadata, + constraints=MergedConstraints(), + provenance={rule_path: prov}, + approved_adrs=[], + ) + + pipeline = EnforcementPipeline(adr_dir=tmp_path, project_path=tmp_path) + result = pipeline.compile(contract=contract) + + assert len(result.provenance) == 1 + entry = result.provenance[0] + assert entry.rule == rule_path + assert entry.source_adr_id == "ADR-0001" + assert len(entry.clause_id) == 12 From 4777bcfa3080d36cbd5b7ec1a8d9fc89eae62f45 Mon Sep 17 00:00:00 2001 From: kschlt Date: Wed, 25 Mar 2026 11:11:40 +0100 Subject: [PATCH 3/4] chore: update CHANGELOG for CPL task --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 17db8c7..f1419bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Canonical enforcement pipeline (`EnforcementPipeline`) — single entry point for all enforcement that reads exclusively from the compiled architecture contract, never from raw ADR files +- `EnforcementResult` audit envelope produced on every ADR approval: tracks which config fragments were applied, which adapters were skipped and why, any conflicts detected, clause-level provenance, and an idempotency hash (same contract → identical hash) +- Contract-driven ESLint adapter (`generate_eslint_config_from_contract`) — generates `no-restricted-imports` rules directly from compiled `MergedConstraints` +- Contract-driven Ruff adapter (`generate_ruff_config_from_contract`) — generates `banned-from` rules from compiled Python and import constraints +- `clause_id` field on every provenance entry — deterministic 12-char identifier (`sha256(adr_id:rule_path)[:12]`) enabling clause-level traceability from enforcement artifacts back to source ADRs +- Topological sort in policy merger — ADRs are now ordered by supersession relationships (Kahn's algorithm) before merging, so superseding ADRs correctly override their predecessors; falls back to date sort when no supersession relationships exist - `CHANGELOG.md` with full version history - `TECHNICAL.md` with implementation details for each layer - `CONTRIBUTING.md` with development environment setup From 65fb5d81e7993fbc3cc067fecbc4a4de3348585a Mon Sep 17 00:00:00 2001 From: kschlt Date: Wed, 25 Mar 2026 11:13:56 +0100 Subject: [PATCH 4/4] fix: resolve mypy type errors in pipeline and approval workflow Fixes two mypy errors caught during quality gate: - EnforcementResult.idempotency_hash Field uses explicit default= kwarg so pydantic plugin recognises the default correctly - _count_policy_rules_applied casts dict.get() result to int to satisfy no-any-return --- adr_kit/decision/workflows/approval.py | 2 +- adr_kit/enforcement/pipeline.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/adr_kit/decision/workflows/approval.py b/adr_kit/decision/workflows/approval.py index a61af92..29cb76e 100644 --- a/adr_kit/decision/workflows/approval.py +++ b/adr_kit/decision/workflows/approval.py @@ -458,7 +458,7 @@ def _count_policy_rules_applied(self, automation_results: dict[str, Any]) -> int """Count total policy rules applied across all systems.""" pipeline = automation_results.get("enforcement_pipeline", {}) if pipeline.get("success"): - return pipeline.get("fragments_applied", 0) + return int(pipeline.get("fragments_applied", 0)) return 0 def _extract_updated_configurations( diff --git a/adr_kit/enforcement/pipeline.py b/adr_kit/enforcement/pipeline.py index 05154ce..99554df 100644 --- a/adr_kit/enforcement/pipeline.py +++ b/adr_kit/enforcement/pipeline.py @@ -81,7 +81,8 @@ class EnforcementResult(BaseModel): fallback_promptlets: list[str] = Field(default_factory=list) provenance: list[ProvenanceEntry] = Field(default_factory=list) idempotency_hash: str = Field( - "", description="SHA-256 of all outputs — identical on re-run with same inputs" + default="", + description="SHA-256 of all outputs — identical on re-run with same inputs", ) def compute_idempotency_hash(self) -> str: