diff --git a/docs/superpowers/plans/2026-04-09-prompt-adherence.md b/docs/superpowers/plans/2026-04-09-prompt-adherence.md new file mode 100644 index 00000000..7e53db97 --- /dev/null +++ b/docs/superpowers/plans/2026-04-09-prompt-adherence.md @@ -0,0 +1,683 @@ +# Prompt Adherence Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build a pipeline step that checks the final plan against the original user prompt and produces a scored report showing which user directives were honored, softened, or ignored. + +**Architecture:** Two-phase LLM approach (extract directives, then score each against the plan). Follows the same pattern as `premortem.py`: Pydantic structured output, `LLMExecutor` for model fallback, dataclass for results with `save_raw`/`save_markdown` methods. Luigi task wired after `self_audit`, before `report`. + +**Tech Stack:** Python 3.13, llama-index (structured LLM output), Pydantic v2, Luigi + +--- + +## File Structure + +``` +worker_plan/worker_plan_internal/ + diagnostics/ + prompt_adherence.py — Phase 1 + Phase 2 logic, Pydantic models, markdown generation + tests/ + test_prompt_adherence.py — Unit tests for Pydantic models and markdown generation + plan/nodes/ + prompt_adherence.py — Luigi task (PromptAdherenceTask) +worker_plan/worker_plan_api/ + filenames.py — Add PROMPT_ADHERENCE_RAW, PROMPT_ADHERENCE_MARKDOWN +``` + +--- + +### Task 1: FilenameEnum entries + +**Files:** +- Modify: `worker_plan/worker_plan_api/filenames.py` + +- [ ] **Step 1: Add filename entries** + +Add after the `SELF_AUDIT_MARKDOWN` line: + +```python + PROMPT_ADHERENCE_RAW = "prompt_adherence_raw.json" + PROMPT_ADHERENCE_MARKDOWN = "prompt_adherence.md" +``` + +- [ ] **Step 2: Verify import works** + +Run: `cd worker_plan && .venv/bin/python -c "from worker_plan_api.filenames import FilenameEnum; print(FilenameEnum.PROMPT_ADHERENCE_RAW.value)"` +Expected: `prompt_adherence_raw.json` + +- [ ] **Step 3: Commit** + +```bash +git add worker_plan/worker_plan_api/filenames.py +git commit -m "feat: add FilenameEnum entries for prompt adherence" +``` + +--- + +### Task 2: Pydantic models and prompt logic + +**Files:** +- Create: `worker_plan/worker_plan_internal/diagnostics/prompt_adherence.py` +- Create: `worker_plan/worker_plan_internal/diagnostics/tests/test_prompt_adherence.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# worker_plan/worker_plan_internal/diagnostics/tests/test_prompt_adherence.py +import unittest +from worker_plan_internal.diagnostics.prompt_adherence import ( + DirectiveType, + Directive, + DirectiveExtractionResult, + AdherenceCategory, + AdherenceResult, + AdherenceScoreResult, + PromptAdherence, +) + + +class TestDirectiveModel(unittest.TestCase): + def test_directive_valid(self): + d = Directive( + directive_id="D1", + directive_type=DirectiveType.CONSTRAINT, + text="Budget: DKK 500M", + importance_5=5, + ) + self.assertEqual(d.directive_id, "D1") + self.assertEqual(d.directive_type, DirectiveType.CONSTRAINT) + self.assertEqual(d.importance_5, 5) + + def test_directive_extraction_result(self): + result = DirectiveExtractionResult( + directives=[ + Directive(directive_id="D1", directive_type=DirectiveType.CONSTRAINT, text="Budget: DKK 500M", importance_5=5), + Directive(directive_id="D2", directive_type=DirectiveType.STATED_FACT, text="East Wing demolished", importance_5=5), + ] + ) + self.assertEqual(len(result.directives), 2) + + +class TestAdherenceResultModel(unittest.TestCase): + def test_adherence_result_valid(self): + r = AdherenceResult( + directive_id="D1", + adherence_5=3, + category=AdherenceCategory.SOFTENED, + evidence="Budget adjusted to DKK 800M", + explanation="The plan increased the budget beyond the stated constraint.", + ) + self.assertEqual(r.adherence_5, 3) + self.assertEqual(r.category, AdherenceCategory.SOFTENED) + + def test_adherence_score_result(self): + result = AdherenceScoreResult( + results=[ + AdherenceResult( + directive_id="D1", adherence_5=5, + category=AdherenceCategory.FULLY_HONORED, + evidence="Budget: DKK 500M", explanation="Honored exactly.", + ), + AdherenceResult( + directive_id="D2", adherence_5=1, + category=AdherenceCategory.CONTRADICTED, + evidence="Demolition permit required", explanation="Plan ignores stated fact.", + ), + ] + ) + self.assertEqual(len(result.results), 2) + + +class TestPromptAdherenceMarkdown(unittest.TestCase): + def test_convert_to_markdown_produces_report(self): + directives = DirectiveExtractionResult( + directives=[ + Directive(directive_id="D1", directive_type=DirectiveType.CONSTRAINT, text="Budget: DKK 500M", importance_5=5), + Directive(directive_id="D2", directive_type=DirectiveType.STATED_FACT, text="East Wing demolished", importance_5=5), + ] + ) + scores = AdherenceScoreResult( + results=[ + AdherenceResult( + directive_id="D1", adherence_5=5, + category=AdherenceCategory.FULLY_HONORED, + evidence="Budget: DKK 500M", explanation="Honored.", + ), + AdherenceResult( + directive_id="D2", adherence_5=1, + category=AdherenceCategory.CONTRADICTED, + evidence="Demolition permit required", + explanation="Plan contradicts stated fact.", + ), + ] + ) + markdown = PromptAdherence.convert_to_markdown(directives, scores) + self.assertIn("# Prompt Adherence Report", markdown) + self.assertIn("Budget: DKK 500M", markdown) + self.assertIn("contradicted", markdown) + self.assertIn("Overall Adherence", markdown) + + def test_overall_score_calculation(self): + # D1: importance=5, adherence=5 -> weighted=25 + # D2: importance=5, adherence=1 -> weighted=5 + # total weighted = 30, max = 50, score = 60% + directives = DirectiveExtractionResult( + directives=[ + Directive(directive_id="D1", directive_type=DirectiveType.CONSTRAINT, text="A", importance_5=5), + Directive(directive_id="D2", directive_type=DirectiveType.STATED_FACT, text="B", importance_5=5), + ] + ) + scores = AdherenceScoreResult( + results=[ + AdherenceResult(directive_id="D1", adherence_5=5, category=AdherenceCategory.FULLY_HONORED, evidence="", explanation=""), + AdherenceResult(directive_id="D2", adherence_5=1, category=AdherenceCategory.CONTRADICTED, evidence="", explanation=""), + ] + ) + score = PromptAdherence.calculate_overall_score(directives, scores) + self.assertEqual(score, 60) + + def test_overall_score_empty(self): + directives = DirectiveExtractionResult(directives=[]) + scores = AdherenceScoreResult(results=[]) + score = PromptAdherence.calculate_overall_score(directives, scores) + self.assertEqual(score, 100) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd worker_plan && .venv/bin/python -m pytest worker_plan_internal/diagnostics/tests/test_prompt_adherence.py -v` +Expected: FAIL with `ModuleNotFoundError` + +- [ ] **Step 3: Implement prompt_adherence.py** + +```python +# worker_plan/worker_plan_internal/diagnostics/prompt_adherence.py +""" +Prompt Adherence: check how faithfully the final plan follows the original user prompt. + +Phase 1: Extract directives (constraints, stated facts, requirements, banned words, intent) from plan.txt. +Phase 2: Score each directive against the final plan artifacts. + +PROMPT> python -m worker_plan_internal.diagnostics.prompt_adherence +""" +import json +import logging +from enum import Enum +from dataclasses import dataclass +from typing import List +from pydantic import BaseModel, Field +from llama_index.core.llms import ChatMessage, MessageRole +from llama_index.core.llms.llm import LLM +from worker_plan_internal.llm_util.llm_executor import LLMExecutor, PipelineStopRequested +from worker_plan_internal.llm_util.llm_errors import LLMChatError + +logger = logging.getLogger(__name__) + + +# -- Pydantic models for Phase 1: Directive Extraction ------------------------- + +class DirectiveType(str, Enum): + CONSTRAINT = "constraint" + STATED_FACT = "stated_fact" + REQUIREMENT = "requirement" + BANNED = "banned" + INTENT = "intent" + + +class Directive(BaseModel): + directive_id: str = Field(description="Enumerate as 'D1', 'D2', 'D3', etc.") + directive_type: DirectiveType = Field(description=( + "constraint: explicit numeric or scope limits (budget, timeline, capacity). " + "stated_fact: things the user says are already true about the world. " + "requirement: what must be built or done. " + "banned: words, approaches, or technologies the user explicitly prohibits. " + "intent: the user's posture, tone, or implied expectations about execution vs. study." + )) + text: str = Field(description="The user's words — short quote or close paraphrase (under 100 chars).") + importance_5: int = Field(description="1 (minor detail) to 5 (core requirement). Rate how central this is to the user's request.") + + +class DirectiveExtractionResult(BaseModel): + directives: List[Directive] = Field(description="5-15 directives extracted from the user's prompt.") + + +# -- Pydantic models for Phase 2: Adherence Scoring --------------------------- + +class AdherenceCategory(str, Enum): + FULLY_HONORED = "fully_honored" + PARTIALLY_HONORED = "partially_honored" + SOFTENED = "softened" + IGNORED = "ignored" + CONTRADICTED = "contradicted" + UNSOLICITED_CAVEAT = "unsolicited_caveat" + + +class AdherenceResult(BaseModel): + directive_id: str = Field(description="References a directive from Phase 1.") + adherence_5: int = Field(description="1 (ignored/contradicted) to 5 (fully honored).") + category: AdherenceCategory = Field(description=( + "fully_honored: plan respects this exactly. " + "partially_honored: plan addresses it but incompletely. " + "softened: plan weakens the requirement. " + "ignored: plan doesn't address it at all. " + "contradicted: plan says the opposite. " + "unsolicited_caveat: plan adds qualifications the user didn't ask for." + )) + evidence: str = Field(description="Direct quote from the plan (under 200 chars).") + explanation: str = Field(description="How the plan handled this directive and why this score was given.") + + +class AdherenceScoreResult(BaseModel): + results: List[AdherenceResult] = Field(description="One scoring result per directive from Phase 1.") + + +# -- System prompts ------------------------------------------------------------ + +EXTRACT_DIRECTIVES_SYSTEM_PROMPT = """\ +You are analyzing the original user prompt for a project planning pipeline. + +Your job is to extract the user's directives — the things the plan MUST respect. \ +These are the user's stated constraints, facts about the world, requirements, \ +banned items, and implied intent. + +Focus on things that are easy for a planning pipeline to dilute: +- Stated facts about the current state of the world (e.g., "the building is already demolished") +- Hard numeric constraints (budget, timeline, capacity) +- Explicit scope boundaries (what to build, what NOT to build) +- Banned words or approaches +- The user's posture: are they saying "execute this" or "study whether to do this"? + +Extract 5-15 directives. Prioritize specificity over quantity. \ +Rate importance from 1 (minor detail) to 5 (core requirement). + +Do NOT extract generic project management advice. \ +Only extract what the USER specifically stated or clearly implied. +""" + +SCORE_ADHERENCE_SYSTEM_PROMPT = """\ +You are checking whether a project plan faithfully follows the user's original directives. + +You will receive: +1. The user's original prompt +2. A list of extracted directives (what the user asked for) +3. The final plan artifacts + +For each directive, score how well the plan honored it: +- adherence_5: 1 (ignored or contradicted) to 5 (fully honored) +- category: what happened to this directive in the plan +- evidence: quote from the plan (under 200 chars) showing how it was handled +- explanation: why you gave this score + +Be strict. The user wrote their prompt for a reason. If the plan softens \ +"100% renewable" to "aim for 60-80%", that is SOFTENED, not PARTIALLY_HONORED. \ +If the user says "the East Wing is already demolished" and the plan includes \ +demolition permitting, that is CONTRADICTED. + +Plans that add feasibility studies, risk disclaimers, or scope reductions that \ +the user didn't ask for should be flagged as UNSOLICITED_CAVEAT. + +Plans that use generic project management boilerplate instead of addressing \ +the specific problem should score low on adherence. +""" + + +# -- Business logic ------------------------------------------------------------ + +@dataclass +class PromptAdherence: + system_prompt_phase1: str + system_prompt_phase2: str + user_prompt: str + directives: dict + scores: dict + metadata: dict + markdown: str + + @classmethod + def execute(cls, llm_executor: LLMExecutor, plan_prompt: str, plan_context: str) -> 'PromptAdherence': + if not isinstance(llm_executor, LLMExecutor): + raise ValueError("Invalid LLMExecutor instance.") + if not isinstance(plan_prompt, str): + raise ValueError("Invalid plan_prompt.") + if not isinstance(plan_context, str): + raise ValueError("Invalid plan_context.") + + system_prompt_phase1 = EXTRACT_DIRECTIVES_SYSTEM_PROMPT.strip() + system_prompt_phase2 = SCORE_ADHERENCE_SYSTEM_PROMPT.strip() + + # Phase 1: Extract directives from the original prompt + logger.info("Prompt Adherence Phase 1: Extracting directives from plan prompt...") + phase1_messages = [ + ChatMessage(role=MessageRole.SYSTEM, content=system_prompt_phase1), + ChatMessage(role=MessageRole.USER, content=f"User's original prompt:\n{plan_prompt}"), + ] + + def execute_phase1(llm: LLM) -> dict: + sllm = llm.as_structured_llm(DirectiveExtractionResult) + chat_response = sllm.chat(phase1_messages) + metadata = dict(llm.metadata) + metadata["llm_classname"] = llm.class_name() + return {"pydantic_response": chat_response.raw, "metadata": metadata} + + try: + phase1_result = llm_executor.run(execute_phase1) + except PipelineStopRequested: + raise + except Exception as e: + llm_error = LLMChatError(cause=e) + logger.error(f"Phase 1 failed [{llm_error.error_id}]", exc_info=True) + raise llm_error from e + + extraction: DirectiveExtractionResult = phase1_result["pydantic_response"] + logger.info(f"Phase 1 complete: extracted {len(extraction.directives)} directives.") + + # Phase 2: Score each directive against the plan + logger.info("Prompt Adherence Phase 2: Scoring directives against final plan...") + directives_json = json.dumps(extraction.model_dump(), indent=2) + phase2_messages = [ + ChatMessage(role=MessageRole.SYSTEM, content=system_prompt_phase2), + ChatMessage(role=MessageRole.USER, content=( + f"User's original prompt:\n{plan_prompt}\n\n" + f"Extracted directives:\n{directives_json}\n\n" + f"Final plan artifacts:\n{plan_context}" + )), + ] + + def execute_phase2(llm: LLM) -> dict: + sllm = llm.as_structured_llm(AdherenceScoreResult) + chat_response = sllm.chat(phase2_messages) + metadata = dict(llm.metadata) + metadata["llm_classname"] = llm.class_name() + return {"pydantic_response": chat_response.raw, "metadata": metadata} + + try: + phase2_result = llm_executor.run(execute_phase2) + except PipelineStopRequested: + raise + except Exception as e: + llm_error = LLMChatError(cause=e) + logger.error(f"Phase 2 failed [{llm_error.error_id}]", exc_info=True) + raise llm_error from e + + scoring: AdherenceScoreResult = phase2_result["pydantic_response"] + logger.info(f"Phase 2 complete: scored {len(scoring.results)} directives.") + + metadata = { + "phase1": phase1_result["metadata"], + "phase2": phase2_result["metadata"], + } + markdown = cls.convert_to_markdown(extraction, scoring) + + return PromptAdherence( + system_prompt_phase1=system_prompt_phase1, + system_prompt_phase2=system_prompt_phase2, + user_prompt=plan_prompt, + directives=extraction.model_dump(), + scores=scoring.model_dump(), + metadata=metadata, + markdown=markdown, + ) + + def to_dict(self, include_metadata=True, include_system_prompt=True, include_user_prompt=True, include_markdown=True) -> dict: + d = { + "directives": self.directives, + "scores": self.scores, + } + if include_metadata: + d["metadata"] = self.metadata + if include_system_prompt: + d["system_prompt_phase1"] = self.system_prompt_phase1 + d["system_prompt_phase2"] = self.system_prompt_phase2 + if include_user_prompt: + d["user_prompt"] = self.user_prompt + if include_markdown: + d["markdown"] = self.markdown + return d + + def save_raw(self, file_path: str) -> None: + with open(file_path, 'w') as f: + f.write(json.dumps(self.to_dict(), indent=2)) + + def save_markdown(self, output_file_path: str) -> None: + with open(output_file_path, 'w', encoding='utf-8') as f: + f.write(self.markdown) + + @staticmethod + def calculate_overall_score(directives: DirectiveExtractionResult, scores: AdherenceScoreResult) -> int: + """Weighted average: sum(adherence_5 * importance_5) / sum(5 * importance_5) as integer percentage.""" + if not directives.directives: + return 100 + importance_map = {d.directive_id: d.importance_5 for d in directives.directives} + weighted_sum = 0 + max_sum = 0 + for r in scores.results: + importance = importance_map.get(r.directive_id, 3) + weighted_sum += r.adherence_5 * importance + max_sum += 5 * importance + if max_sum == 0: + return 100 + return round(weighted_sum * 100 / max_sum) + + @staticmethod + def convert_to_markdown(directives: DirectiveExtractionResult, scores: AdherenceScoreResult) -> str: + lines: list[str] = [] + lines.append("# Prompt Adherence Report") + lines.append("") + + # Build lookup + importance_map = {d.directive_id: d for d in directives.directives} + + # Calculate overall score + overall = PromptAdherence.calculate_overall_score(directives, scores) + lines.append(f"**Overall Adherence: {overall}%**") + lines.append("") + + # Sort by severity: importance * (6 - adherence), worst first + scored_items = [] + for r in scores.results: + d = importance_map.get(r.directive_id) + importance = d.importance_5 if d else 3 + severity = importance * (6 - r.adherence_5) + scored_items.append((severity, d, r)) + scored_items.sort(key=lambda x: x[0], reverse=True) + + # Summary table + lines.append("## Summary") + lines.append("") + lines.append("| ID | Directive | Type | Importance | Adherence | Category |") + lines.append("|----|-----------|------|------------|-----------|----------|") + for _, d, r in scored_items: + directive_text = d.text if d else "Unknown" + directive_type = d.directive_type.value if d else "unknown" + lines.append( + f"| {r.directive_id} | {_escape_table_cell(directive_text)} " + f"| {directive_type} | {d.importance_5 if d else '?'}/5 " + f"| {r.adherence_5}/5 | {r.category.value} |" + ) + lines.append("") + + # Detail section for poorly-scored directives + poor_items = [(sev, d, r) for sev, d, r in scored_items if r.adherence_5 <= 3] + if poor_items: + lines.append("## Issues") + lines.append("") + for _, d, r in poor_items: + directive_text = d.text if d else "Unknown" + lines.append(f"### {r.directive_id}: {directive_text}") + lines.append("") + lines.append(f"- **Category:** {r.category.value}") + lines.append(f"- **Adherence:** {r.adherence_5}/5") + lines.append(f"- **Importance:** {d.importance_5 if d else '?'}/5") + lines.append(f"- **Evidence:** {r.evidence}") + lines.append(f"- **Explanation:** {r.explanation}") + lines.append("") + + return "\n".join(lines) + + +def _escape_table_cell(text: str) -> str: + return text.replace("|", "\\|").replace("\n", " ") +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd worker_plan && .venv/bin/python -m pytest worker_plan_internal/diagnostics/tests/test_prompt_adherence.py -v` +Expected: All tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add worker_plan/worker_plan_internal/diagnostics/prompt_adherence.py worker_plan/worker_plan_internal/diagnostics/tests/test_prompt_adherence.py +git commit -m "feat: add prompt adherence Pydantic models, prompts, and markdown generation" +``` + +--- + +### Task 3: Luigi task + +**Files:** +- Create: `worker_plan/worker_plan_internal/plan/nodes/prompt_adherence.py` + +- [ ] **Step 1: Implement the Luigi task** + +```python +# worker_plan/worker_plan_internal/plan/nodes/prompt_adherence.py +"""PromptAdherenceTask - Check how faithfully the plan follows the original prompt.""" +from worker_plan_internal.plan.run_plan_pipeline import PlanTask +from worker_plan_internal.diagnostics.prompt_adherence import PromptAdherence +from worker_plan_internal.llm_util.llm_executor import LLMExecutor +from worker_plan_api.filenames import FilenameEnum +from worker_plan_internal.plan.nodes.setup import SetupTask +from worker_plan_internal.plan.nodes.project_plan import ProjectPlanTask +from worker_plan_internal.plan.nodes.executive_summary import ExecutiveSummaryTask +from worker_plan_internal.plan.nodes.consolidate_assumptions_markdown import ConsolidateAssumptionsMarkdownTask + + +class PromptAdherenceTask(PlanTask): + """Score how faithfully the final plan follows the user's original prompt.""" + + def output(self): + return { + 'raw': self.local_target(FilenameEnum.PROMPT_ADHERENCE_RAW), + 'markdown': self.local_target(FilenameEnum.PROMPT_ADHERENCE_MARKDOWN), + } + + def requires(self): + return { + 'setup': self.clone(SetupTask), + 'project_plan': self.clone(ProjectPlanTask), + 'executive_summary': self.clone(ExecutiveSummaryTask), + 'consolidate_assumptions_markdown': self.clone(ConsolidateAssumptionsMarkdownTask), + } + + def run_inner(self): + llm_executor: LLMExecutor = self.create_llm_executor() + + with self.input()['setup'].open("r") as f: + plan_prompt = f.read() + with self.input()['project_plan']['markdown'].open("r") as f: + project_plan_markdown = f.read() + with self.input()['executive_summary']['markdown'].open("r") as f: + executive_summary_markdown = f.read() + with self.input()['consolidate_assumptions_markdown']['full'].open("r") as f: + assumptions_markdown = f.read() + + plan_context = ( + f"File 'executive_summary.md':\n{executive_summary_markdown}\n\n" + f"File 'project_plan.md':\n{project_plan_markdown}\n\n" + f"File 'consolidate_assumptions_full.md':\n{assumptions_markdown}" + ) + + result = PromptAdherence.execute( + llm_executor=llm_executor, + plan_prompt=plan_prompt, + plan_context=plan_context, + ) + + result.save_raw(self.output()['raw'].path) + result.save_markdown(self.output()['markdown'].path) +``` + +- [ ] **Step 2: Verify import works** + +Run: `cd worker_plan && .venv/bin/python -c "from worker_plan_internal.plan.nodes.prompt_adherence import PromptAdherenceTask; print('OK')"` +Expected: `OK` + +- [ ] **Step 3: Commit** + +```bash +git add worker_plan/worker_plan_internal/plan/nodes/prompt_adherence.py +git commit -m "feat: add PromptAdherenceTask Luigi node" +``` + +--- + +### Task 4: Wire into pipeline and report + +**Files:** +- Modify: `worker_plan/worker_plan_internal/plan/nodes/full_plan_pipeline.py` +- Modify: `worker_plan/worker_plan_internal/plan/nodes/report.py` + +- [ ] **Step 1: Add to full_plan_pipeline.py** + +Add the import at the top with the other node imports: + +```python +from worker_plan_internal.plan.nodes.prompt_adherence import PromptAdherenceTask +``` + +Add to the `requires()` dict, after `'self_audit'` and before `'report'`: + +```python + 'prompt_adherence': self.clone(PromptAdherenceTask), +``` + +- [ ] **Step 2: Add to report.py** + +Add the import at the top: + +```python +from worker_plan_internal.plan.nodes.prompt_adherence import PromptAdherenceTask +``` + +Add to `requires()` dict: + +```python + 'prompt_adherence': self.clone(PromptAdherenceTask), +``` + +In `run_inner()`, find where `self_audit` is appended and add after it: + +```python + rg.append_markdown_with_tables('Prompt Adherence', self.input()['prompt_adherence']['markdown'].path) +``` + +- [ ] **Step 3: Run full test suite** + +Run: `cd worker_plan && .venv/bin/python -m pytest -q` +Expected: All tests pass + +- [ ] **Step 4: Commit** + +```bash +git add worker_plan/worker_plan_internal/plan/nodes/full_plan_pipeline.py worker_plan/worker_plan_internal/plan/nodes/report.py +git commit -m "feat: wire PromptAdherenceTask into pipeline and report" +``` + +--- + +### Task 5: Integration verification + +- [ ] **Step 1: Verify extract_dag picks up the new node** + +Run: `cd worker_plan && .venv/bin/python -c "from worker_plan_internal.extract_dag import extract_dag; dag = extract_dag(); nodes = {n['id'] for n in dag['nodes']}; assert 'prompt_adherence' in nodes; print(f'OK: {len(nodes)} nodes')"` +Expected: `OK: nodes` (one more than before) + +- [ ] **Step 2: Run full test suite** + +Run: `cd worker_plan && .venv/bin/python -m pytest -q` +Expected: All tests pass, no regressions + +- [ ] **Step 3: Commit any fixes** + +Only if step 2 revealed issues. Otherwise skip. diff --git a/docs/superpowers/specs/2026-04-09-prompt-adherence-design.md b/docs/superpowers/specs/2026-04-09-prompt-adherence-design.md new file mode 100644 index 00000000..ce60e58f --- /dev/null +++ b/docs/superpowers/specs/2026-04-09-prompt-adherence-design.md @@ -0,0 +1,133 @@ +# Prompt Adherence Check for PlanExe + +## Problem + +PlanExe's pipeline has a "normalization bias." Each of the ~70 nodes nudges the plan toward what a reasonable project *should* look like, and the cumulative drift over the full pipeline is significant. The user's stated reality gets overridden by the LLM's priors about what's plausible. + +This manifests as: +- **Stated facts ignored.** The user says "the East Wing has already been demolished" but the plan includes demolition permitting steps. +- **Requirements softened.** The user says "100% renewable energy" and the plan targets 60-80%. +- **Intent diluted.** The user's tone is "this is happening, execute it" but the plan spends 40% on feasibility studies. +- **Unsolicited caveats.** The plan adds qualifications, risk disclaimers, and scope reductions the user didn't ask for. +- **Generic PM filler.** The plan relies on boilerplate project management language instead of addressing the specific problem. + +Existing pipeline steps (Premise Attack, Premortem, Expert Criticism, Self Audit) assess plan *quality* — whether the plan is internally consistent, well-structured, and risk-aware. None of them check whether the plan actually does what the user asked. + +## Goal + +A pipeline step that checks the final plan against the original user prompt and produces a scored report showing which user directives were honored, softened, or ignored. The user can scan the report and immediately see the degree of prompt drift. + +## Architecture + +Two-phase LLM approach: extract directives from the prompt, then score each one against the final plan. + +### Phase 1 — Extract Directives + +Read `plan.txt` (the original user prompt) and extract a structured list of directives. Each directive is one thing the user stated or implied that the plan must respect. + +```python +class DirectiveType(str, Enum): + CONSTRAINT = "constraint" # "Budget: DKK 500M", "Timeline: 12 months" + STATED_FACT = "stated_fact" # "The East Wing has already been demolished" + REQUIREMENT = "requirement" # "Build a casino", "Reeducate teachers" + BANNED = "banned" # "Banned words: blockchain/NFT" + INTENT = "intent" # "I'm not targeting revenue", tone/posture signals +``` + +Each directive has: +- `directive_id`: "D1", "D2", etc. +- `directive_type`: one of the types above +- `text`: the user's words (short quote or paraphrase) +- `importance_5`: 1 (minor detail) to 5 (core requirement) + +The LLM is instructed to extract 5-15 directives, prioritizing things that are easy to dilute: stated facts about the world, hard numbers, explicit scope boundaries, banned words, and the user's posture (execute vs. study). + +### Phase 2 — Score Against Final Plan + +Read the extracted directives plus the final plan artifacts (executive summary, project plan, consolidated assumptions). For each directive, score adherence. + +```python +class AdherenceCategory(str, Enum): + FULLY_HONORED = "fully_honored" + PARTIALLY_HONORED = "partially_honored" + SOFTENED = "softened" # requirement weakened + IGNORED = "ignored" # not addressed at all + CONTRADICTED = "contradicted" # plan says the opposite + UNSOLICITED_CAVEAT = "unsolicited_caveat" # plan adds qualifications user didn't ask for +``` + +Each scoring result has: +- `directive_id`: references a Phase 1 directive +- `adherence_5`: 1 (ignored/contradicted) to 5 (fully honored) +- `category`: one of the categories above +- `evidence`: direct quote from the plan (under 200 chars) +- `explanation`: how the plan handled this directive and why the score was given + +### Output Files + +- `prompt_adherence_raw.json` — full structured data (directives + scores + metadata) +- `prompt_adherence.md` — human-readable report + +### Markdown Report Structure + +1. **Summary table** — all directives sorted by severity (importance_5 x (6 - adherence_5), worst offenders first): + +``` +| ID | Directive | Type | Importance | Adherence | Category | +|----|-----------|------|------------|-----------|----------| +| D3 | "East Wing already demolished" | stated_fact | 5/5 | 1/5 | contradicted | +| D1 | "Budget: DKK 500M" | constraint | 5/5 | 3/5 | softened | +| D7 | "No feasibility studies" | intent | 4/5 | 2/5 | ignored | +``` + +2. **Overall adherence score** — weighted average: `sum(adherence_5 * importance_5) / sum(5 * importance_5)` as a percentage. A plan that fully honors everything scores 100%. + +3. **Detail section** — for each directive scoring adherence_5 ≤ 3, the full explanation and evidence quotes from both the prompt and the plan. + +### Pipeline Placement + +After `self_audit`, before `report`. The task reads: +- `setup` — plan.txt (the original user prompt) +- `executive_summary` — the final plan summary +- `project_plan` — the detailed plan +- `consolidate_assumptions_markdown` — accumulated assumptions that may have drifted + +The report task includes `prompt_adherence.md` in the final HTML output. + +### FilenameEnum Entries + +```python +PROMPT_ADHERENCE_RAW = "prompt_adherence_raw.json" +PROMPT_ADHERENCE_MARKDOWN = "prompt_adherence.md" +``` + +### Code Structure + +``` +worker_plan/worker_plan_internal/ + diagnostics/ + prompt_adherence.py — Phase 1 + Phase 2 logic, Pydantic models, markdown generation + plan/nodes/ + prompt_adherence.py — Luigi task (PromptAdherenceTask) +``` + +Follows the same pattern as `premortem.py` / `nodes/premortem.py`: +- Business logic in `diagnostics/prompt_adherence.py` +- Luigi wiring in `plan/nodes/prompt_adherence.py` +- Pydantic structured output via `llm.as_structured_llm()` +- `LLMExecutor` for model fallback and retry + +### Scope Boundaries + +**In scope:** +- Extract directives from plan.txt +- Score each directive against the final plan +- Produce JSON + markdown report +- Integrate as a Luigi pipeline step +- Include in the final HTML report + +**Out of scope:** +- Fixing the drift (this step surfaces it, doesn't correct it) +- Tracing where in the pipeline drift was introduced (that's RCA's job) +- Judging plan quality (that's self_audit's job) +- Comparing multiple plans against each other diff --git a/worker_plan/app.py b/worker_plan/app.py index 35051f97..88ffb872 100644 --- a/worker_plan/app.py +++ b/worker_plan/app.py @@ -223,7 +223,7 @@ def create_run_directory(request: StartRunRequest) -> tuple[str, Path]: start_time_file.save(run_dir / FilenameEnum.START_TIME.value) plan_file = PlanFile.create(vague_plan_description=request.plan_prompt, start_time=start_time) - plan_file.save(run_dir / FilenameEnum.INITIAL_PLAN.value) + plan_file.save(run_dir / FilenameEnum.INITIAL_PLAN_RAW.value) return run_id, run_dir.resolve() diff --git a/worker_plan/worker_plan_api/filenames.py b/worker_plan/worker_plan_api/filenames.py index fc7ba624..783ad375 100644 --- a/worker_plan/worker_plan_api/filenames.py +++ b/worker_plan/worker_plan_api/filenames.py @@ -2,6 +2,7 @@ class FilenameEnum(str, Enum): START_TIME = "start_time.json" + INITIAL_PLAN_RAW = "plan_raw.json" INITIAL_PLAN = "plan.txt" PLANEXE_METADATA = "planexe_metadata.json" SCREEN_PLANNING_PROMPT_RAW = "screen_planning_prompt.json" @@ -128,6 +129,8 @@ class FilenameEnum(str, Enum): PREMORTEM_MARKDOWN = "premortem.md" SELF_AUDIT_RAW = "self_audit_raw.json" SELF_AUDIT_MARKDOWN = "self_audit.md" + PROMPT_ADHERENCE_RAW = "prompt_adherence_raw.json" + PROMPT_ADHERENCE_MARKDOWN = "prompt_adherence.md" REPORT = "report.html" PIPELINE_COMPLETE = "pipeline_complete.txt" diff --git a/worker_plan/worker_plan_api/plan_file.py b/worker_plan/worker_plan_api/plan_file.py index 83dc1e47..3720d1e0 100644 --- a/worker_plan/worker_plan_api/plan_file.py +++ b/worker_plan/worker_plan_api/plan_file.py @@ -1,28 +1,50 @@ """ PROMPT> python -m worker_plan_api.plan_file """ +import json from datetime import datetime from dataclasses import dataclass + +PLAN_TEMPLATE = "Plan:\n{plan_prompt}\n\nToday's date:\n{pretty_date}\n\nProject start ASAP" + + @dataclass class PlanFile: - content: str + plan_prompt: str + pretty_date: str @classmethod def create(cls, vague_plan_description: str, start_time: datetime) -> "PlanFile": pretty_date = start_time.strftime("%Y-%b-%d") - plan_prompt = ( - f"Plan:\n{vague_plan_description}\n\n" - f"Today's date:\n{pretty_date}\n\n" - "Project start ASAP" - ) - return cls(plan_prompt) + return cls(plan_prompt=vague_plan_description, pretty_date=pretty_date) + + def to_dict(self) -> dict: + return { + "plan_prompt": self.plan_prompt, + "pretty_date": self.pretty_date, + } + + @classmethod + def from_dict(cls, data: dict) -> "PlanFile": + return cls(plan_prompt=data["plan_prompt"], pretty_date=data["pretty_date"]) + + @classmethod + def load(cls, file_path: str) -> "PlanFile": + with open(file_path, "r", encoding="utf-8") as f: + return cls.from_dict(json.load(f)) def save(self, file_path: str) -> None: with open(file_path, "w", encoding="utf-8") as f: - f.write(self.content) + json.dump(self.to_dict(), f, indent=2) + + def to_plan_text(self) -> str: + return PLAN_TEMPLATE.format(plan_prompt=self.plan_prompt, pretty_date=self.pretty_date) + if __name__ == "__main__": start_time: datetime = datetime.now().astimezone() plan = PlanFile.create(vague_plan_description="My plan is here!", start_time=start_time) - print(plan.content) + print(json.dumps(plan.to_dict(), indent=2)) + print("---") + print(plan.to_plan_text()) diff --git a/worker_plan/worker_plan_internal/diagnostics/prompt_adherence.py b/worker_plan/worker_plan_internal/diagnostics/prompt_adherence.py new file mode 100644 index 00000000..dced2e93 --- /dev/null +++ b/worker_plan/worker_plan_internal/diagnostics/prompt_adherence.py @@ -0,0 +1,368 @@ +# worker_plan/worker_plan_internal/diagnostics/prompt_adherence.py +""" +Prompt Adherence: check how faithfully the final plan follows the original user prompt. + +Phase 1: Extract directives (constraints, stated facts, requirements, banned words, intent) from plan.txt. +Phase 2: Score each directive against the final plan artifacts. + +PROMPT> python -m worker_plan_internal.diagnostics.prompt_adherence +""" +import json +import logging +from enum import Enum +from dataclasses import dataclass +from typing import List, Literal +from pydantic import BaseModel, Field +from llama_index.core.llms import ChatMessage, MessageRole +from llama_index.core.llms.llm import LLM +from worker_plan_internal.llm_util.llm_executor import LLMExecutor, PipelineStopRequested +from worker_plan_internal.llm_util.llm_errors import LLMChatError + +logger = logging.getLogger(__name__) + + +# -- Pydantic models for Phase 1: Directive Extraction ------------------------- + +class DirectiveType(str, Enum): + CONSTRAINT = "constraint" + STATED_FACT = "stated_fact" + REQUIREMENT = "requirement" + BANNED = "banned" + INTENT = "intent" + + +class Directive(BaseModel): + directive_index: int = Field(description="Index of this directive, starting from 1.") + directive_type: Literal["constraint", "stated_fact", "requirement", "banned", "intent"] = Field(description=( + "constraint: explicit numeric or scope limits (budget, timeline, capacity). " + "stated_fact: things the user says are already true about the world. " + "requirement: what must be built or done. " + "banned: words, approaches, or technologies the user explicitly prohibits. " + "intent: the user's posture, tone, or implied expectations about execution vs. study." + )) + text: str = Field(description="The user's words — short quote or close paraphrase (under 100 chars).") + importance_5: int = Field(description="1 (minor detail) to 5 (core requirement). Rate how central this is to the user's request.") + + +class DirectiveExtractionResult(BaseModel): + directives: List[Directive] = Field(description="5-15 directives extracted from the user's prompt.") + + +# -- Pydantic models for Phase 2: Adherence Scoring --------------------------- + +class AdherenceCategory(str, Enum): + FULLY_HONORED = "fully_honored" + PARTIALLY_HONORED = "partially_honored" + SOFTENED = "softened" + IGNORED = "ignored" + CONTRADICTED = "contradicted" + UNSOLICITED_CAVEAT = "unsolicited_caveat" + + +class AdherenceResult(BaseModel): + directive_index: int = Field(description="References a directive_index from Phase 1.") + adherence_5: int = Field(description="1 (ignored/contradicted) to 5 (fully honored).") + category: Literal["fully_honored", "partially_honored", "softened", "ignored", "contradicted", "unsolicited_caveat"] = Field(description=( + "fully_honored: plan respects this exactly. " + "partially_honored: plan addresses it but incompletely. " + "softened: plan weakens the requirement. " + "ignored: plan doesn't address it at all. " + "contradicted: plan says the opposite. " + "unsolicited_caveat: plan adds qualifications the user didn't ask for." + )) + evidence: str = Field(description="Direct quote from the plan (under 200 chars).") + explanation: str = Field(description="How the plan handled this directive and why this score was given.") + + +class AdherenceScoreResult(BaseModel): + results: List[AdherenceResult] = Field(description="One scoring result per directive from Phase 1.") + + +# -- System prompts ------------------------------------------------------------ + +EXTRACT_DIRECTIVES_SYSTEM_PROMPT = """\ +You are analyzing the original user prompt for a project planning pipeline. + +Your job is to extract the user's directives — the things the plan MUST respect. \ +These are the user's stated constraints, facts about the world, requirements, \ +banned items, and implied intent. + +Focus on things that are easy for a planning pipeline to dilute: +- Stated facts about the current state of the world (e.g., "the building is already demolished") +- Hard numeric constraints (budget, timeline, capacity) +- Explicit scope boundaries (what to build, what NOT to build) +- Banned words or approaches +- The user's posture: are they saying "execute this" or "study whether to do this"? + +Extract 5-15 directives. Prioritize specificity over quantity. \ +Rate importance from 1 (minor detail) to 5 (core requirement). + +Do NOT extract generic project management advice. \ +Only extract what the USER specifically stated or clearly implied. +""" + +SCORE_ADHERENCE_SYSTEM_PROMPT = """\ +You are checking whether a project plan faithfully follows the user's original directives. + +You will receive: +1. The user's original prompt +2. A list of extracted directives (what the user asked for) +3. The final plan artifacts + +For each directive, score how well the plan honored it: +- adherence_5: 1 (ignored or contradicted) to 5 (fully honored) +- category: what happened to this directive in the plan +- evidence: quote from the plan (under 200 chars) showing how it was handled +- explanation: why you gave this score + +Be strict. The user wrote their prompt for a reason. If the plan softens \ +"100% renewable" to "aim for 60-80%", that is SOFTENED, not PARTIALLY_HONORED. \ +If the user says "the East Wing is already demolished" and the plan includes \ +demolition permitting, that is CONTRADICTED. + +Plans that add feasibility studies, risk disclaimers, or scope reductions that \ +the user didn't ask for should be flagged as UNSOLICITED_CAVEAT. + +Plans that use generic project management boilerplate instead of addressing \ +the specific problem should score low on adherence. +""" + + +# -- Business logic ------------------------------------------------------------ + +@dataclass +class PromptAdherence: + system_prompt_phase1: str + system_prompt_phase2: str + user_prompt: str + directives: dict + scores: dict + metadata: dict + markdown: str + + @classmethod + def execute(cls, llm_executor: LLMExecutor, plan_prompt: str, plan_context: str) -> 'PromptAdherence': + if not isinstance(llm_executor, LLMExecutor): + raise ValueError("Invalid LLMExecutor instance.") + if not isinstance(plan_prompt, str): + raise ValueError("Invalid plan_prompt.") + if not isinstance(plan_context, str): + raise ValueError("Invalid plan_context.") + + system_prompt_phase1 = EXTRACT_DIRECTIVES_SYSTEM_PROMPT.strip() + system_prompt_phase2 = SCORE_ADHERENCE_SYSTEM_PROMPT.strip() + + # Phase 1: Extract directives from the original prompt + logger.info("Prompt Adherence Phase 1: Extracting directives from plan prompt...") + phase1_messages = [ + ChatMessage(role=MessageRole.SYSTEM, content=system_prompt_phase1), + ChatMessage(role=MessageRole.USER, content=f"User's original prompt:\n{plan_prompt}"), + ] + + def execute_phase1(llm: LLM) -> dict: + sllm = llm.as_structured_llm(DirectiveExtractionResult) + chat_response = sllm.chat(phase1_messages) + metadata = dict(llm.metadata) + metadata["llm_classname"] = llm.class_name() + return {"pydantic_response": chat_response.raw, "metadata": metadata} + + try: + phase1_result = llm_executor.run(execute_phase1) + except PipelineStopRequested: + raise + except Exception as e: + llm_error = LLMChatError(cause=e) + logger.error(f"Phase 1 failed [{llm_error.error_id}]", exc_info=True) + raise llm_error from e + + extraction: DirectiveExtractionResult = phase1_result["pydantic_response"] + logger.info(f"Phase 1 complete: extracted {len(extraction.directives)} directives.") + + # Phase 2: Score each directive against the plan + logger.info("Prompt Adherence Phase 2: Scoring directives against final plan...") + directives_json = json.dumps(extraction.model_dump(), indent=2) + phase2_messages = [ + ChatMessage(role=MessageRole.SYSTEM, content=system_prompt_phase2), + ChatMessage(role=MessageRole.USER, content=( + f"User's original prompt:\n{plan_prompt}\n\n" + f"Extracted directives:\n{directives_json}\n\n" + f"Final plan artifacts:\n{plan_context}" + )), + ] + + def execute_phase2(llm: LLM) -> dict: + sllm = llm.as_structured_llm(AdherenceScoreResult) + chat_response = sllm.chat(phase2_messages) + metadata = dict(llm.metadata) + metadata["llm_classname"] = llm.class_name() + return {"pydantic_response": chat_response.raw, "metadata": metadata} + + try: + phase2_result = llm_executor.run(execute_phase2) + except PipelineStopRequested: + raise + except Exception as e: + llm_error = LLMChatError(cause=e) + logger.error(f"Phase 2 failed [{llm_error.error_id}]", exc_info=True) + raise llm_error from e + + scoring: AdherenceScoreResult = phase2_result["pydantic_response"] + logger.info(f"Phase 2 complete: scored {len(scoring.results)} directives.") + + metadata = { + "phase1": phase1_result["metadata"], + "phase2": phase2_result["metadata"], + } + markdown = cls.convert_to_markdown(extraction, scoring) + + return PromptAdherence( + system_prompt_phase1=system_prompt_phase1, + system_prompt_phase2=system_prompt_phase2, + user_prompt=plan_prompt, + directives=extraction.model_dump(), + scores=scoring.model_dump(), + metadata=metadata, + markdown=markdown, + ) + + def to_dict(self, include_metadata=True, include_system_prompt=True, include_user_prompt=True, include_markdown=True) -> dict: + d = { + "directives": self.directives, + "scores": self.scores, + } + if include_metadata: + d["metadata"] = self.metadata + if include_system_prompt: + d["system_prompt_phase1"] = self.system_prompt_phase1 + d["system_prompt_phase2"] = self.system_prompt_phase2 + if include_user_prompt: + d["user_prompt"] = self.user_prompt + if include_markdown: + d["markdown"] = self.markdown + return d + + def save_raw(self, file_path: str) -> None: + with open(file_path, 'w') as f: + f.write(json.dumps(self.to_dict(), indent=2)) + + def save_markdown(self, output_file_path: str) -> None: + with open(output_file_path, 'w', encoding='utf-8') as f: + f.write(self.markdown) + + @staticmethod + def calculate_overall_score(directives: DirectiveExtractionResult, scores: AdherenceScoreResult) -> int: + """Weighted average: sum(adherence_5 * importance_5) / sum(5 * importance_5) as integer percentage.""" + if not directives.directives: + return 100 + importance_map = {d.directive_index: d.importance_5 for d in directives.directives} + weighted_sum = 0 + max_sum = 0 + for r in scores.results: + importance = importance_map.get(r.directive_index, 3) + weighted_sum += r.adherence_5 * importance + max_sum += 5 * importance + if max_sum == 0: + return 100 + return round(weighted_sum * 100 / max_sum) + + @staticmethod + def convert_to_markdown(directives: DirectiveExtractionResult, scores: AdherenceScoreResult) -> str: + lines: list[str] = [] + # Build lookup + importance_map = {d.directive_index: d for d in directives.directives} + + # Calculate overall score with math breakdown + weighted_parts = [] + importance_parts = [] + importances = [] + for r in scores.results: + d = importance_map.get(r.directive_index) + importance = d.importance_5 if d else 3 + importances.append(importance) + weighted_parts.append(f"{importance}×{r.adherence_5}") + importance_parts.append(str(importance)) + weighted_sum = sum( + r.adherence_5 * (importance_map.get(r.directive_index).importance_5 if importance_map.get(r.directive_index) else 3) + for r in scores.results + ) + importance_sum = sum(importances) + overall = round(weighted_sum * 100 / (importance_sum * 5)) if importance_sum > 0 else 100 + lines.append(f"**Overall Adherence: {overall}%**") + lines.append("") + if weighted_parts: + lines.append("```") + lines.append(f"IMPORTANCE_ADHERENCE_SUM = ({' + '.join(weighted_parts)}) = {weighted_sum}") + lines.append(f"IMPORTANCE_SUM = {' + '.join(importance_parts)} = {importance_sum}") + lines.append(f"OVERALL_ADHERENCE = IMPORTANCE_ADHERENCE_SUM / (IMPORTANCE_SUM × 5) = {weighted_sum} / {importance_sum * 5} = {overall}%") + lines.append("```") + lines.append("") + + # Sort by directive index + scored_items = [] + for r in scores.results: + d = importance_map.get(r.directive_index) + importance = d.importance_5 if d else 3 + severity = importance * (6 - r.adherence_5) + scored_items.append((severity, d, r)) + scored_items.sort(key=lambda x: x[2].directive_index) + + # Summary table + lines.append("## Summary") + lines.append("") + lines.append("| ID | Directive | Type | Importance | Adherence | Category |") + lines.append("|----|-----------|------|------------|-----------|----------|") + for _, d, r in scored_items: + directive_text = d.text if d else "Unknown" + directive_type = _DIRECTIVE_TYPE_LABELS.get(d.directive_type, d.directive_type) if d else "Unknown" + lines.append( + f"| {r.directive_index} | {_escape_table_cell(directive_text)} " + f"| {directive_type} | {d.importance_5 if d else '?'}/5 " + f"| {r.adherence_5}/5 | {_format_category(r.category)} |" + ) + lines.append("") + + # Detail section for poorly-scored directives + poor_items = [(sev, d, r) for sev, d, r in scored_items if r.adherence_5 < 5] + poor_items.sort(key=lambda x: x[0], reverse=True) + if poor_items: + lines.append("## Issues") + lines.append("") + for _, d, r in poor_items: + directive_text = d.text if d else "Unknown" + lines.append(f"### Issue {r.directive_index} - {directive_text}") + lines.append("") + lines.append(f"- **Category:** {_format_category(r.category)}") + lines.append(f"- **Adherence:** {r.adherence_5}/5") + lines.append(f"- **Importance:** {d.importance_5 if d else '?'}/5") + lines.append(f"- **Evidence:** {r.evidence}") + lines.append(f"- **Explanation:** {r.explanation}") + lines.append("") + + return "\n".join(lines) + + +_DIRECTIVE_TYPE_LABELS = { + "constraint": "Constraint", + "stated_fact": "Stated fact", + "requirement": "Requirement", + "banned": "Banned", + "intent": "Intent", +} + + +_CATEGORY_LABELS = { + "fully_honored": "Fully honored", + "partially_honored": "Partially honored", + "softened": "Softened", + "ignored": "Ignored", + "contradicted": "Contradicted", + "unsolicited_caveat": "Unsolicited caveat", +} + + +def _format_category(category: str) -> str: + return _CATEGORY_LABELS.get(category, category) + + +def _escape_table_cell(text: str) -> str: + return text.replace("|", "\\|").replace("\n", " ") diff --git a/worker_plan/worker_plan_internal/diagnostics/tests/test_prompt_adherence.py b/worker_plan/worker_plan_internal/diagnostics/tests/test_prompt_adherence.py new file mode 100644 index 00000000..a82c3001 --- /dev/null +++ b/worker_plan/worker_plan_internal/diagnostics/tests/test_prompt_adherence.py @@ -0,0 +1,115 @@ +# worker_plan/worker_plan_internal/diagnostics/tests/test_prompt_adherence.py +import unittest +from worker_plan_internal.diagnostics.prompt_adherence import ( + DirectiveType, + Directive, + DirectiveExtractionResult, + AdherenceCategory, + AdherenceResult, + AdherenceScoreResult, + PromptAdherence, +) + + +class TestDirectiveModel(unittest.TestCase): + def test_directive_valid(self): + d = Directive( + directive_index=1, + directive_type=DirectiveType.CONSTRAINT, + text="Budget: DKK 500M", + importance_5=5, + ) + self.assertEqual(d.directive_index, 1) + self.assertEqual(d.directive_type, DirectiveType.CONSTRAINT) + self.assertEqual(d.importance_5, 5) + + def test_directive_extraction_result(self): + result = DirectiveExtractionResult( + directives=[ + Directive(directive_index=1, directive_type=DirectiveType.CONSTRAINT, text="Budget: DKK 500M", importance_5=5), + Directive(directive_index=2, directive_type=DirectiveType.STATED_FACT, text="East Wing demolished", importance_5=5), + ] + ) + self.assertEqual(len(result.directives), 2) + + +class TestAdherenceResultModel(unittest.TestCase): + def test_adherence_result_valid(self): + r = AdherenceResult( + directive_index=1, + adherence_5=3, + category=AdherenceCategory.SOFTENED, + evidence="Budget adjusted to DKK 800M", + explanation="The plan increased the budget beyond the stated constraint.", + ) + self.assertEqual(r.adherence_5, 3) + self.assertEqual(r.category, AdherenceCategory.SOFTENED) + + def test_adherence_score_result(self): + result = AdherenceScoreResult( + results=[ + AdherenceResult( + directive_index=1, adherence_5=5, + category=AdherenceCategory.FULLY_HONORED, + evidence="Budget: DKK 500M", explanation="Honored exactly.", + ), + AdherenceResult( + directive_index=2, adherence_5=1, + category=AdherenceCategory.CONTRADICTED, + evidence="Demolition permit required", explanation="Plan ignores stated fact.", + ), + ] + ) + self.assertEqual(len(result.results), 2) + + +class TestPromptAdherenceMarkdown(unittest.TestCase): + def test_convert_to_markdown_produces_report(self): + directives = DirectiveExtractionResult( + directives=[ + Directive(directive_index=1, directive_type=DirectiveType.CONSTRAINT, text="Budget: DKK 500M", importance_5=5), + Directive(directive_index=2, directive_type=DirectiveType.STATED_FACT, text="East Wing demolished", importance_5=5), + ] + ) + scores = AdherenceScoreResult( + results=[ + AdherenceResult( + directive_index=1, adherence_5=5, + category=AdherenceCategory.FULLY_HONORED, + evidence="Budget: DKK 500M", explanation="Honored.", + ), + AdherenceResult( + directive_index=2, adherence_5=1, + category=AdherenceCategory.CONTRADICTED, + evidence="Demolition permit required", + explanation="Plan contradicts stated fact.", + ), + ] + ) + markdown = PromptAdherence.convert_to_markdown(directives, scores) + self.assertNotIn("# Prompt Adherence Report", markdown) + self.assertIn("Budget: DKK 500M", markdown) + self.assertIn("Contradicted", markdown) + self.assertIn("Overall Adherence", markdown) + + def test_overall_score_calculation(self): + directives = DirectiveExtractionResult( + directives=[ + Directive(directive_index=1, directive_type=DirectiveType.CONSTRAINT, text="A", importance_5=5), + Directive(directive_index=2, directive_type=DirectiveType.STATED_FACT, text="B", importance_5=5), + ] + ) + scores = AdherenceScoreResult( + results=[ + AdherenceResult(directive_index=1, adherence_5=5, category=AdherenceCategory.FULLY_HONORED, evidence="", explanation=""), + AdherenceResult(directive_index=2, adherence_5=1, category=AdherenceCategory.CONTRADICTED, evidence="", explanation=""), + ] + ) + score = PromptAdherence.calculate_overall_score(directives, scores) + self.assertEqual(score, 60) + + def test_overall_score_empty(self): + directives = DirectiveExtractionResult(directives=[]) + scores = AdherenceScoreResult(results=[]) + score = PromptAdherence.calculate_overall_score(directives, scores) + self.assertEqual(score, 100) diff --git a/worker_plan/worker_plan_internal/plan/nodes/full_plan_pipeline.py b/worker_plan/worker_plan_internal/plan/nodes/full_plan_pipeline.py index ed51e3bd..970df417 100644 --- a/worker_plan/worker_plan_internal/plan/nodes/full_plan_pipeline.py +++ b/worker_plan/worker_plan_internal/plan/nodes/full_plan_pipeline.py @@ -88,6 +88,7 @@ from worker_plan_internal.plan.nodes.questions_and_answers import QuestionsAndAnswersTask from worker_plan_internal.plan.nodes.premortem import PremortemTask from worker_plan_internal.plan.nodes.self_audit import SelfAuditTask +from worker_plan_internal.plan.nodes.prompt_adherence import PromptAdherenceTask from worker_plan_internal.plan.nodes.report import ReportTask @@ -163,6 +164,7 @@ def requires(self): 'questions_and_answers': self.clone(QuestionsAndAnswersTask), 'premortem': self.clone(PremortemTask), 'self_audit': self.clone(SelfAuditTask), + 'prompt_adherence': self.clone(PromptAdherenceTask), 'report': self.clone(ReportTask), } diff --git a/worker_plan/worker_plan_internal/plan/nodes/prompt_adherence.py b/worker_plan/worker_plan_internal/plan/nodes/prompt_adherence.py new file mode 100644 index 00000000..95c83b0c --- /dev/null +++ b/worker_plan/worker_plan_internal/plan/nodes/prompt_adherence.py @@ -0,0 +1,56 @@ +"""PromptAdherenceTask - Check how faithfully the plan follows the original prompt.""" +from worker_plan_internal.plan.run_plan_pipeline import PlanTask +from worker_plan_internal.diagnostics.prompt_adherence import PromptAdherence +from worker_plan_internal.llm_util.llm_executor import LLMExecutor +from worker_plan_api.filenames import FilenameEnum +from worker_plan_api.plan_file import PlanFile +from worker_plan_internal.plan.nodes.setup import SetupTask +from worker_plan_internal.plan.nodes.project_plan import ProjectPlanTask +from worker_plan_internal.plan.nodes.executive_summary import ExecutiveSummaryTask +from worker_plan_internal.plan.nodes.consolidate_assumptions_markdown import ConsolidateAssumptionsMarkdownTask + + +class PromptAdherenceTask(PlanTask): + """Score how faithfully the final plan follows the user's original prompt.""" + + def output(self): + return { + 'raw': self.local_target(FilenameEnum.PROMPT_ADHERENCE_RAW), + 'markdown': self.local_target(FilenameEnum.PROMPT_ADHERENCE_MARKDOWN), + } + + def requires(self): + return { + 'setup': self.clone(SetupTask), + 'project_plan': self.clone(ProjectPlanTask), + 'executive_summary': self.clone(ExecutiveSummaryTask), + 'consolidate_assumptions_markdown': self.clone(ConsolidateAssumptionsMarkdownTask), + } + + def run_inner(self): + llm_executor: LLMExecutor = self.create_llm_executor() + + plan_raw_path = self.run_id_dir / FilenameEnum.INITIAL_PLAN_RAW.value + plan_file = PlanFile.load(str(plan_raw_path)) + plan_prompt = plan_file.plan_prompt + with self.input()['project_plan']['markdown'].open("r") as f: + project_plan_markdown = f.read() + with self.input()['executive_summary']['markdown'].open("r") as f: + executive_summary_markdown = f.read() + with self.input()['consolidate_assumptions_markdown']['full'].open("r") as f: + assumptions_markdown = f.read() + + plan_context = ( + f"File 'executive_summary.md':\n{executive_summary_markdown}\n\n" + f"File 'project_plan.md':\n{project_plan_markdown}\n\n" + f"File 'consolidate_assumptions_full.md':\n{assumptions_markdown}" + ) + + result = PromptAdherence.execute( + llm_executor=llm_executor, + plan_prompt=plan_prompt, + plan_context=plan_context, + ) + + result.save_raw(self.output()['raw'].path) + result.save_markdown(self.output()['markdown'].path) diff --git a/worker_plan/worker_plan_internal/plan/nodes/report.py b/worker_plan/worker_plan_internal/plan/nodes/report.py index cc123fee..339c1ae2 100644 --- a/worker_plan/worker_plan_internal/plan/nodes/report.py +++ b/worker_plan/worker_plan_internal/plan/nodes/report.py @@ -25,6 +25,7 @@ from worker_plan_internal.plan.nodes.questions_and_answers import QuestionsAndAnswersTask from worker_plan_internal.plan.nodes.premortem import PremortemTask from worker_plan_internal.plan.nodes.self_audit import SelfAuditTask +from worker_plan_internal.plan.nodes.prompt_adherence import PromptAdherenceTask from worker_plan_internal.plan.nodes.screen_planning_prompt import ScreenPlanningPromptTask @@ -58,7 +59,8 @@ def requires(self): 'create_schedule': self.clone(CreateScheduleTask), 'questions_and_answers': self.clone(QuestionsAndAnswersTask), 'premortem': self.clone(PremortemTask), - 'self_audit': self.clone(SelfAuditTask) + 'self_audit': self.clone(SelfAuditTask), + 'prompt_adherence': self.clone(PromptAdherenceTask), } def run_inner(self): @@ -94,4 +96,5 @@ def run_inner(self): redline_gate_markdown_file_path=self.input()['redline_gate']['markdown'].path, premise_attack_markdown_file_path=self.input()['premise_attack']['markdown'].path ) + rg.append_markdown_with_tables('Prompt Adherence', self.input()['prompt_adherence']['markdown'].path) rg.save_report(self.output().path, title=title, execute_plan_section_hidden=REPORT_EXECUTE_PLAN_SECTION_HIDDEN) diff --git a/worker_plan/worker_plan_internal/plan/nodes/setup.py b/worker_plan/worker_plan_internal/plan/nodes/setup.py index af60ff95..56ce171e 100644 --- a/worker_plan/worker_plan_internal/plan/nodes/setup.py +++ b/worker_plan/worker_plan_internal/plan/nodes/setup.py @@ -1,14 +1,22 @@ -"""SetupTask - The plan prompt text provided by the user.""" +"""SetupTask - Convert plan_raw.json into the plan.txt used by the pipeline.""" from worker_plan_internal.plan.run_plan_pipeline import PlanTask from worker_plan_api.filenames import FilenameEnum +from worker_plan_api.plan_file import PlanFile class SetupTask(PlanTask): - """Load the user's plan prompt as the pipeline input.""" + """Read plan_raw.json and produce plan.txt from the template.""" def output(self): return self.local_target(FilenameEnum.INITIAL_PLAN) def run(self): - # The Gradio/Flask app that starts the luigi pipeline, must first create the `INITIAL_PLAN` file inside the `run_id_dir`. - # This code will ONLY run if the Gradio/Flask app *failed* to create the file. - raise AssertionError(f"This code is not supposed to be run. Before starting the pipeline the '{FilenameEnum.INITIAL_PLAN.value}' file must be present in the `run_id_dir`: {self.run_id_dir!r}") + raw_path = self.run_id_dir / FilenameEnum.INITIAL_PLAN_RAW.value + if not raw_path.exists(): + raise FileNotFoundError( + f"Before starting the pipeline the '{FilenameEnum.INITIAL_PLAN_RAW.value}' file " + f"must be present in the run_id_dir: {self.run_id_dir!r}" + ) + plan_file = PlanFile.load(str(raw_path)) + plan_text = plan_file.to_plan_text() + with open(self.output().path, "w", encoding="utf-8") as f: + f.write(plan_text) diff --git a/worker_plan/worker_plan_internal/plan/ping_llm.py b/worker_plan/worker_plan_internal/plan/ping_llm.py index a8dd2303..b8717669 100644 --- a/worker_plan/worker_plan_internal/plan/ping_llm.py +++ b/worker_plan/worker_plan_internal/plan/ping_llm.py @@ -37,9 +37,9 @@ def _validate_run_dir(run_id_dir: Path) -> None: raise FileNotFoundError( f"The '{FilenameEnum.START_TIME.value}' file does not exist in the run_id_dir: {run_id_dir!r}" ) - if not (run_id_dir / FilenameEnum.INITIAL_PLAN.value).exists(): + if not (run_id_dir / FilenameEnum.INITIAL_PLAN_RAW.value).exists(): raise FileNotFoundError( - f"The '{FilenameEnum.INITIAL_PLAN.value}' file does not exist in the run_id_dir: {run_id_dir!r}" + f"The '{FilenameEnum.INITIAL_PLAN_RAW.value}' file does not exist in the run_id_dir: {run_id_dir!r}" ) diff --git a/worker_plan/worker_plan_internal/plan/run_plan_pipeline.py b/worker_plan/worker_plan_internal/plan/run_plan_pipeline.py index 00999753..30ca4b0d 100644 --- a/worker_plan/worker_plan_internal/plan/run_plan_pipeline.py +++ b/worker_plan/worker_plan_internal/plan/run_plan_pipeline.py @@ -213,8 +213,8 @@ def setup(self) -> None: raise NotADirectoryError(f"The run_id_dir is not a directory: {self.run_id_dir!r}") if not (self.run_id_dir / FilenameEnum.START_TIME.value).exists(): raise FileNotFoundError(f"The '{FilenameEnum.START_TIME.value}' file does not exist in the run_id_dir: {self.run_id_dir!r}") - if not (self.run_id_dir / FilenameEnum.INITIAL_PLAN.value).exists(): - raise FileNotFoundError(f"The '{FilenameEnum.INITIAL_PLAN.value}' file does not exist in the run_id_dir: {self.run_id_dir!r}") + if not (self.run_id_dir / FilenameEnum.INITIAL_PLAN_RAW.value).exists(): + raise FileNotFoundError(f"The '{FilenameEnum.INITIAL_PLAN_RAW.value}' file does not exist in the run_id_dir: {self.run_id_dir!r}") from worker_plan_internal.plan.nodes.full_plan_pipeline import FullPlanPipeline full_plan_pipeline_task = FullPlanPipeline( diff --git a/worker_plan/worker_plan_internal/plan/tests/test_ping_llm.py b/worker_plan/worker_plan_internal/plan/tests/test_ping_llm.py index a9cffe84..a09604ab 100644 --- a/worker_plan/worker_plan_internal/plan/tests/test_ping_llm.py +++ b/worker_plan/worker_plan_internal/plan/tests/test_ping_llm.py @@ -13,7 +13,7 @@ def test_ping_llm_report_fallback(self): with TemporaryDirectory() as temp_dir: run_id_dir = Path(temp_dir) (run_id_dir / FilenameEnum.START_TIME.value).write_text("{}", encoding="utf-8") - (run_id_dir / FilenameEnum.INITIAL_PLAN.value).write_text("Ping test", encoding="utf-8") + (run_id_dir / FilenameEnum.INITIAL_PLAN_RAW.value).write_text('{"plan_prompt": "Ping test", "pretty_date": "1984-Apr-09"}', encoding="utf-8") bad_llm = ResponseMockLLM(responses=["raise:BAD"]) good_llm = ResponseMockLLM(responses=["PONG ok"]) diff --git a/worker_plan/worker_plan_internal/report/report_generator.py b/worker_plan/worker_plan_internal/report/report_generator.py index cb8a8fe5..0b24caef 100644 --- a/worker_plan/worker_plan_internal/report/report_generator.py +++ b/worker_plan/worker_plan_internal/report/report_generator.py @@ -125,7 +125,7 @@ def append_markdown_with_tables(self, document_title: str, file_path: Path, css_ if md_data is None: logging.warning(f"Document: '{document_title}'. Could not read markdown file: {file_path}") return - html = markdown.markdown(md_data, extensions=['tables']) + html = markdown.markdown(md_data, extensions=['tables', 'fenced_code']) self.report_item_list.append(ReportDocumentItem(document_title, html, css_classes=css_classes)) def append_csv(self, document_title: str, file_path: Path, css_classes: list[str] = []):