diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 84a1121..3dde37d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -103,7 +103,6 @@ jobs: memory --dir /tmp/test-memory get FACT_ci_test_fact memory --dir /tmp/test-memory tags memory --dir /tmp/test-memory list - memory --dir /tmp/test-memory doctor test-mcp: runs-on: ubuntu-latest diff --git a/README.md b/README.md index ce56587..f8cd382 100644 --- a/README.md +++ b/README.md @@ -18,9 +18,8 @@ AI agents lose context between sessions. This protocol gives them a structured w - **Auto-scaling** — RST files split at 50 entries, transparent to queries - **Git-native** — every memory is an RST directive, fully diffable and versioned - **MCP server** — expose memory as tools for Claude Desktop, VS Code Copilot, and other MCP clients -- **Autonomous capture** — extract memories from Git commits, CI logs, and discussion transcripts -- **Planning engine** — analyze memory graph and propose maintenance actions -- **CLI-first** — 16+ subcommands for full lifecycle management +- **Build-as-guardian** — `needs_warnings` quality gates enforce tagging, linking, and body quality at build time +- **CLI-first** — 12 subcommands for full lifecycle management ## Installation @@ -78,18 +77,12 @@ memory recall [query] [--tag ...] [--format brief|compact|context|json] memory get # Full details of one memory memory related [--hops N] # Graph walk from a memory memory list [--type TYPE] [--status S] # Browse all memories -memory update [--confidence ...] [--add-tags ...] +memory update [--confidence ...] [--add-tags ...] [--body ...] [--title ...] memory deprecate [--by NEW_ID] # Mark as deprecated memory tags [--prefix PREFIX] # Discover tags in use memory stale # Find expired/overdue memories memory review # Show memories needing review memory rebuild # Rebuild needs.json -memory capture git # Extract memories from recent commits -memory capture ci --input # Extract memories from CI/test logs -memory capture discussion --input # Extract from conversation transcripts -memory plan [--auto-apply] # Analyze graph and propose maintenance -memory apply # Execute a generated plan -memory doctor # Verify installation health ``` Key flags for `recall`: @@ -158,16 +151,11 @@ Add to `.vscode/mcp.json`: | `memory_recall` | Search memories by text/tags with formatting options | | `memory_get` | Get full details of a specific memory | | `memory_add` | Record a new memory with tags and metadata | -| `memory_update` | Update metadata (status, confidence, tags, etc.) | +| `memory_update` | Update content or metadata (title, body, status, confidence, tags, etc.) | | `memory_deprecate` | Mark a memory as deprecated | | `memory_tags` | List all tags with counts | | `memory_stale` | Find expired/overdue memories | | `memory_rebuild` | Rebuild needs.json index | -| `memory_capture_git` | Extract memories from recent Git commits | -| `memory_capture_ci` | Extract memories from CI/test log output | -| `memory_capture_discussion` | Extract memories from conversation transcripts | -| `memory_plan` | Analyze memory graph and propose maintenance actions | -| `memory_apply` | Execute a generated maintenance plan | ## Memory Types @@ -295,11 +283,8 @@ ai_memory_protocol/ └── src/ └── ai_memory_protocol/ ├── __init__.py - ├── cli.py # CLI (argparse, 16+ subcommands) - ├── mcp_server.py # MCP server (13 tools, stdio transport) - ├── capture.py # Knowledge extraction (git, CI, discussion) - ├── planner.py # Graph analysis and maintenance planning - ├── executor.py # Plan execution engine + ├── cli.py # CLI (argparse, 12 subcommands) + ├── mcp_server.py # MCP server (8 tools, stdio transport) ├── config.py # Type definitions, constants ├── engine.py # Workspace detection, search, graph walk ├── formatter.py # Output formatting (brief/compact/context/json) @@ -309,38 +294,6 @@ ai_memory_protocol/ Memory data lives in a **separate workspace** (e.g., `.memories/`), created with `memory init`. -## Autonomous Workflow - -The protocol supports a fully autonomous memory lifecycle — agents can capture, plan, and maintain knowledge without human intervention: - -``` - capture (git / CI / discussion) - │ - ▼ - plan (analyze graph → propose actions) - │ - ▼ - apply (execute plan → add/update/deprecate) - │ - ▼ - rebuild (sphinx-build → needs.json) - │ - ▼ - recall (search updated graph) -``` - -**Capture sources:** -- `memory capture git` — scans recent commits, extracts decisions, bug fixes, refactors -- `memory capture ci --input ` — parses test failures, compiler errors, deprecation warnings -- `memory capture discussion --input ` — classifies conversation into decisions, facts, preferences, risks, goals, questions - -**Planning engine:** -- `memory plan` — analyzes the memory graph for staleness, missing links, contradictions, and proposes maintenance actions -- `memory plan --auto-apply` — execute the plan immediately after analysis -- `memory apply plan.json` — execute a previously saved plan - -All captured candidates include provenance (`--source`) and are deduplicated against existing memories. - ## Build-as-Guardian The Sphinx build acts as a quality gate for the memory graph. `needs_warnings` in `conf.py` define constraints that fire during `memory rebuild`: diff --git a/pyproject.toml b/pyproject.toml index 1a1a1ea..4b34266 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ai-memory-protocol" -version = "0.3.0" +version = "0.3.1" description = "AI Memory Protocol — versioned, graph-based memory for AI agents using Sphinx-Needs" readme = "README.md" license = { text = "Apache-2.0" } diff --git a/src/ai_memory_protocol/__init__.py b/src/ai_memory_protocol/__init__.py index 714084e..3cfb310 100644 --- a/src/ai_memory_protocol/__init__.py +++ b/src/ai_memory_protocol/__init__.py @@ -1,3 +1,3 @@ """AI Memory Protocol — versioned, graph-based memory for AI agents.""" -__version__ = "0.3.0" +__version__ = "0.3.1" diff --git a/src/ai_memory_protocol/capture.py b/src/ai_memory_protocol/capture.py deleted file mode 100644 index 925ef8c..0000000 --- a/src/ai_memory_protocol/capture.py +++ /dev/null @@ -1,829 +0,0 @@ -"""Capture knowledge from external sources — git, CI, discussions. - -Extract memories from: -- **Git history** — commit messages classified by conventional commit type -- **CI logs** — test failures, build errors, warnings -- **Discussions** — decisions, preferences, goals from conversation transcripts - -Usage: - from ai_memory_protocol.capture import capture_from_git, capture_from_ci, capture_from_discussion - candidates = capture_from_git(workspace, repo_path, since="2 weeks ago") - candidates = capture_from_ci(workspace, log_text) - candidates = capture_from_discussion(workspace, transcript) -""" - -from __future__ import annotations - -import re -import subprocess -from dataclasses import dataclass, field -from difflib import SequenceMatcher -from pathlib import Path -from typing import Any - -from .engine import load_needs - -# --------------------------------------------------------------------------- -# Candidate (not yet a memory — needs review before adding) -# --------------------------------------------------------------------------- - - -@dataclass -class MemoryCandidate: - """A candidate memory extracted from a source (git, CI, discussion).""" - - type: str - title: str - body: str - tags: list[str] = field(default_factory=list) - source: str = "" - confidence: str = "medium" - scope: str = "global" - # For dedup - _source_hashes: list[str] = field(default_factory=list) - - def to_dict(self) -> dict[str, Any]: - d = { - "type": self.type, - "title": self.title, - "body": self.body, - "tags": self.tags, - "source": self.source, - "confidence": self.confidence, - "scope": self.scope, - } - return {k: v for k, v in d.items() if v} - - -# --------------------------------------------------------------------------- -# Git commit parsing -# --------------------------------------------------------------------------- - - -_GIT_RECORD_SEP = "\x1e" # ASCII Record Separator between commits -_GIT_FIELD_SEP = "\x1f" # ASCII Unit Separator between fields -_GIT_LOG_FORMAT = ( - f"%H{_GIT_FIELD_SEP}%s{_GIT_FIELD_SEP}%b{_GIT_FIELD_SEP}%an{_GIT_FIELD_SEP}%ad{_GIT_RECORD_SEP}" -) - - -@dataclass -class _GitCommit: - """Parsed git commit.""" - - hash: str - subject: str - body: str - author: str - date: str - files: list[str] = field(default_factory=list) - - -def _parse_git_log(repo_path: Path, since: str, until: str) -> list[_GitCommit]: - """Run git log and parse the output.""" - # Base git log command - cmd: list[str] = [ - "git", - "log", - f"--format={_GIT_LOG_FORMAT}", - "--date=iso-strict", - ] - - # Heuristic: if arguments contain spaces (e.g. "2 weeks ago"), treat them as - # date expressions and use --since/--until. Otherwise treat them as refs and - # use git's revision range syntax. - has_since = bool(since) - has_until = bool(until) - since_is_date = has_since and (" " in since) - until_is_date = has_until and (" " in until) - - if has_since and has_until: - if since_is_date or until_is_date: - # Date-based range - cmd.append(f"--since={since}") - cmd.append(f"--until={until}") - else: - # Ref-based range: use {since}..{until} - cmd.append(f"{since}..{until}") - elif has_since: - if since_is_date: - cmd.append(f"--since={since}") - else: - cmd.append(since) - elif has_until: - if until_is_date: - cmd.append(f"--until={until}") - else: - cmd.append(until) - - try: - result = subprocess.run( - cmd, - cwd=str(repo_path), - capture_output=True, - text=True, - ) - if result.returncode != 0: - return [] - except OSError: - return [] - - commits: list[_GitCommit] = [] - # Split on record separator — safe even when %b contains newlines - for record in result.stdout.split(_GIT_RECORD_SEP): - record = record.strip() - if not record: - continue - parts = record.split(_GIT_FIELD_SEP) - if len(parts) < 5: - continue - commit = _GitCommit( - hash=parts[0].strip(), - subject=parts[1].strip(), - body=parts[2].strip(), - author=parts[3].strip(), - date=parts[4].strip(), - ) - commits.append(commit) - - # Get changed files per commit - for c in commits: - try: - files_result = subprocess.run( - ["git", "diff-tree", "--no-commit-id", "--name-only", "-r", c.hash], - cwd=str(repo_path), - capture_output=True, - text=True, - ) - c.files = [f.strip() for f in files_result.stdout.strip().split("\n") if f.strip()] - except OSError: - # Best-effort: if git diff-tree fails for this commit (e.g. git not - # available or repository in an unexpected state), leave the files - # list unchanged for this commit and continue processing others. - pass - - return commits - - -# --------------------------------------------------------------------------- -# Commit classification -# --------------------------------------------------------------------------- - -# Patterns for classifying commits by conventional commit prefix -_CLASSIFY_PATTERNS: list[tuple[str, str, str]] = [ - # (regex_pattern, memory_type, confidence) - (r"^fix[\(:]|^bugfix[\(:]|^hotfix[\(:]", "mem", "high"), - (r"^feat[\(:]|^add[\(:]|^feature[\(:]", "fact", "medium"), - (r"^refactor[\(:]|^perf[\(:]|^optimize[\(:]", "dec", "medium"), - (r"^docs[\(:]|^doc[\(:]", "fact", "low"), - (r"^test[\(:]|^tests[\(:]", "mem", "low"), - (r"^ci[\(:]|^build[\(:]|^chore[\(:]", "mem", "low"), - (r"BREAKING[ _]CHANGE", "risk", "high"), - (r"^revert[\(:]", "mem", "medium"), - (r"^style[\(:]", "pref", "low"), -] - - -def _classify_commit(commit: _GitCommit) -> tuple[str, str]: - """Classify a commit into a memory type + confidence. - - Returns (type, confidence). Falls back to "mem"/"low" for unclassifiable commits. - """ - subject = commit.subject - body_text = f"{subject} {commit.body}" - - # Check BREAKING CHANGE first (can appear anywhere) - if re.search(r"BREAKING[ _]CHANGE", body_text, re.IGNORECASE): - return "risk", "high" - - for pattern, mem_type, confidence in _CLASSIFY_PATTERNS: - if re.search(pattern, subject, re.IGNORECASE): - return mem_type, confidence - - return "mem", "low" - - -def _extract_scope(subject: str) -> str: - """Extract scope from conventional commit format. e.g. 'fix(gateway): ...' → 'gateway'.""" - match = re.match(r"^\w+\(([^)]+)\)", subject) - return match.group(1) if match else "" - - -def _infer_tags(commit: _GitCommit, repo_name: str) -> list[str]: - """Infer tags from commit metadata.""" - tags: list[str] = [f"repo:{repo_name}"] - - # Extract scope as topic tag - scope = _extract_scope(commit.subject) - if scope: - tags.append(f"topic:{scope}") - - # Infer topic from file paths - path_topics: set[str] = set() - for f in commit.files: - parts = Path(f).parts - if len(parts) >= 2: - # Use first meaningful directory as topic - for part in parts: - if part not in ("src", "lib", "test", "tests", "include", ".", ".."): - path_topics.add(part.replace("_", "-")) - break - - for topic in sorted(path_topics)[:3]: # Limit to 3 path-based topics - tag = f"topic:{topic}" - if tag not in tags: - tags.append(tag) - - return tags - - -# --------------------------------------------------------------------------- -# Grouping related commits -# --------------------------------------------------------------------------- - - -def _file_overlap(files1: list[str], files2: list[str]) -> float: - """Compute Jaccard similarity of file sets.""" - s1, s2 = set(files1), set(files2) - union = s1 | s2 - if not union: - return 0.0 - return len(s1 & s2) / len(union) - - -def _group_commits( - commits: list[_GitCommit], file_overlap_threshold: float = 0.3 -) -> list[list[_GitCommit]]: - """Group commits by file overlap (simple greedy clustering).""" - if not commits: - return [] - - groups: list[list[_GitCommit]] = [[commits[0]]] - - for commit in commits[1:]: - best_group = -1 - best_overlap = 0.0 - for i, group in enumerate(groups): - # Compare against all commits in the group - group_files = [f for c in group for f in c.files] - overlap = _file_overlap(commit.files, group_files) - if overlap > best_overlap: - best_overlap = overlap - best_group = i - - if best_overlap >= file_overlap_threshold and best_group >= 0: - groups[best_group].append(commit) - else: - groups.append([commit]) - - return groups - - -# --------------------------------------------------------------------------- -# Deduplication against existing memories -# --------------------------------------------------------------------------- - - -def _is_duplicate( - candidate: MemoryCandidate, - existing_needs: dict[str, Any], - title_threshold: float = 0.7, -) -> bool: - """Check if a candidate is a near-duplicate of an existing memory.""" - for need in existing_needs.values(): - if need.get("status") == "deprecated": - continue - existing_title = need.get("title", "").lower() - candidate_title = candidate.title.lower() - sim = SequenceMatcher(None, candidate_title, existing_title).ratio() - if sim >= title_threshold: - return True - - # Also check by source (exact commit hash match) - existing_source = need.get("source", "") - if candidate.source and candidate.source in existing_source: - return True - - return False - - -# --------------------------------------------------------------------------- -# Public interface: capture from git -# --------------------------------------------------------------------------- - - -def capture_from_git( - workspace: Path, - repo_path: Path, - since: str = "HEAD~20", - until: str = "HEAD", - repo_name: str | None = None, - deduplicate: bool = True, - min_confidence: str = "low", -) -> list[MemoryCandidate]: - """Analyze git log and generate memory candidates. - - Parameters - ---------- - workspace - Path to the memory workspace (for dedup against existing). - repo_path - Path to the git repository to analyze. - since - Start of the range (commit or date like ``"2 weeks ago"``). - until - End of the range (default: ``"HEAD"``). - repo_name - Repository name for ``repo:`` tags. Auto-detected from path if omitted. - deduplicate - If True, filter out candidates that match existing memories. - min_confidence - Minimum confidence to include. "low" includes all. - - Returns - ------- - list[MemoryCandidate] - Candidate memories ready for review and optional insertion. - """ - if repo_name is None: - repo_name = repo_path.name - - commits = _parse_git_log(repo_path, since, until) - if not commits: - return [] - - # Load existing memories for dedup - existing: dict[str, Any] = {} - if deduplicate: - try: - existing = load_needs(workspace) - except (SystemExit, Exception): - existing = {} - - # Classify and group - conf_rank = {"high": 2, "medium": 1, "low": 0} - min_conf_rank = conf_rank.get(min_confidence, 0) - - candidates: list[MemoryCandidate] = [] - - # Group related commits - groups = _group_commits(commits) - - for group in groups: - if len(group) == 1: - # Single commit → single candidate - commit = group[0] - mem_type, confidence = _classify_commit(commit) - - if conf_rank.get(confidence, 0) < min_conf_rank: - continue - - # Clean title: remove conventional commit prefix - title = re.sub(r"^\w+(\([^)]*\))?:\s*", "", commit.subject) - if not title: - title = commit.subject - - body_parts = [commit.body] if commit.body else [] - if commit.files: - body_parts.append(f"Files: {', '.join(commit.files[:10])}") - - candidate = MemoryCandidate( - type=mem_type, - title=title[:120], - body="\n".join(body_parts), - tags=_infer_tags(commit, repo_name), - source=f"commit:{commit.hash[:8]}", - confidence=confidence, - scope=f"repo:{repo_name}", - _source_hashes=[commit.hash], - ) - candidates.append(candidate) - else: - # Multiple related commits → summarize - primary = group[0] # Use first (most recent) commit - mem_type, confidence = _classify_commit(primary) - - # Upgrade confidence for grouped commits - if len(group) >= 3 and confidence == "low": - confidence = "medium" - - if conf_rank.get(confidence, 0) < min_conf_rank: - continue - - title = re.sub(r"^\w+(\([^)]*\))?:\s*", "", primary.subject) - if not title: - title = primary.subject - - body_parts = [f"Group of {len(group)} related commits:"] - for c in group[:5]: - body_parts.append(f" - {c.subject} ({c.hash[:8]})") - if len(group) > 5: - body_parts.append(f" ... and {len(group) - 5} more") - - all_files: set[str] = set() - all_tags: set[str] = set() - for c in group: - all_files.update(c.files) - for tag in _infer_tags(c, repo_name): - all_tags.add(tag) - - if all_files: - body_parts.append(f"Files: {', '.join(sorted(all_files)[:10])}") - - candidate = MemoryCandidate( - type=mem_type, - title=title[:120], - body="\n".join(body_parts), - tags=sorted(all_tags), - source=f"commit:{primary.hash[:8]}+{len(group) - 1}", - confidence=confidence, - scope=f"repo:{repo_name}", - _source_hashes=[c.hash for c in group], - ) - candidates.append(candidate) - - # Dedup against existing - if deduplicate and existing: - candidates = [c for c in candidates if not _is_duplicate(c, existing)] - - return candidates - - -def format_candidates(candidates: list[MemoryCandidate], fmt: str = "human") -> str: - """Format capture candidates for display.""" - if not candidates: - return "No new memory candidates found." - - if fmt == "json": - import json - - return json.dumps([c.to_dict() for c in candidates], indent=2, ensure_ascii=False) - - lines = [f"## {len(candidates)} memory candidate(s)\n"] - for i, c in enumerate(candidates, 1): - lines.append(f" {i}. [{c.type}] {c.title}") - lines.append(f" Tags: {', '.join(c.tags)}") - lines.append(f" Confidence: {c.confidence} | Source: {c.source}") - if c.body: - # Show first 2 lines of body - body_lines = c.body.split("\n")[:2] - for bl in body_lines: - lines.append(f" {bl}") - lines.append("") - - return "\n".join(lines) - - -# =========================================================================== -# CI Log Capture -# =========================================================================== - -# Patterns for extracting structured data from CI logs -_CI_PATTERNS: list[tuple[str, str, str, str]] = [ - # (regex, memory_type, title_template, confidence) - # Test failures - ( - r"(?:FAILED|FAIL|ERROR)\s*:?\s*(?:test_?)?(\S+?)(?:\s*[-—]\s*(.+))?$", - "mem", - "CI test failure: {name}", - "high", - ), - # Python pytest failures - ( - r"(?:FAILED)\s+([\w/]+\.py::[\w:]+)", - "mem", - "Test failure: {name}", - "high", - ), - # Compiler errors (C/C++) - ( - r"(\S+\.\w+):(\d+):\d+:\s*error:\s*(.+)", - "mem", - "Build error in {file}:{line}", - "high", - ), - # Linker errors - ( - r"(?:undefined reference to|cannot find -l)(.+)", - "mem", - "Linker error: {name}", - "high", - ), - # Deprecation warnings - ( - r"(?:DeprecationWarning|FutureWarning):\s*(.+)", - "risk", - "Deprecation warning: {name}", - "medium", - ), - # Timeout errors - ( - r"(?:TimeoutError|timed?\s*out)\s*:?\s*(.+)?", - "mem", - "Timeout: {name}", - "high", - ), - # CMake / build configuration errors - ( - r"CMake Error.*?:\s*(.+)", - "mem", - "CMake error: {name}", - "high", - ), - # Generic error lines - ( - r"^(?:Error|ERROR)\s*:?\s*(.+)", - "mem", - "CI error: {name}", - "medium", - ), -] - -# Summary line patterns: "X passed, Y failed" -_CI_SUMMARY_PATTERN = re.compile( - r"(\d+)\s+(?:passed|succeeded).*?(\d+)\s+(?:failed|errors?)", - re.IGNORECASE, -) - - -@dataclass -class _CIMatch: - """A matched CI pattern with extracted data.""" - - mem_type: str - title: str - detail: str - confidence: str - line_num: int - - -def _parse_ci_log(text: str) -> list[_CIMatch]: - """Parse CI log text and extract structured error/failure data.""" - matches: list[_CIMatch] = [] - seen_titles: set[str] = set() - - for line_num, line in enumerate(text.splitlines(), 1): - line = line.strip() - if not line: - continue - - for pattern, mem_type, title_tpl, confidence in _CI_PATTERNS: - m = re.search(pattern, line, re.IGNORECASE) - if m: - groups = m.groups() - # Build title from template and captured groups - name = (groups[0] or "").strip() if groups else "" - file_val = "" - line_val = "" - if len(groups) >= 3: - file_val = (groups[0] or "").strip() - line_val = (groups[1] or "").strip() - name = (groups[2] or "").strip() - - title = title_tpl.format( - name=name[:80] if name else "unknown", - file=file_val, - line=line_val, - )[:120] - - # Dedup within same log - if title in seen_titles: - break - seen_titles.add(title) - - detail = line[:200] - matches.append( - _CIMatch( - mem_type=mem_type, - title=title, - detail=detail, - confidence=confidence, - line_num=line_num, - ) - ) - break # One match per line - - return matches - - -def capture_from_ci( - workspace: Path, - log_text: str, - source: str = "ci-log", - tags: list[str] | None = None, - deduplicate: bool = True, -) -> list[MemoryCandidate]: - """Extract memory candidates from CI log output. - - Parameters - ---------- - workspace - Path to the memory workspace (for dedup against existing). - log_text - Raw CI log text (stdout/stderr from build or test run). - source - Source label for provenance (e.g. ``"ci:github-actions:run-123"``). - tags - Additional tags to apply to all candidates. Auto-infers ``topic:ci``. - deduplicate - If True, filter out candidates that match existing memories. - - Returns - ------- - list[MemoryCandidate] - Candidate memories ready for review and optional insertion. - """ - base_tags = ["topic:ci"] - if tags: - base_tags.extend(t for t in tags if t not in base_tags) - - matches = _parse_ci_log(log_text) - if not matches: - return [] - - # Load existing for dedup - existing: dict[str, Any] = {} - if deduplicate: - try: - existing = load_needs(workspace) - except (SystemExit, Exception): - existing = {} - - candidates: list[MemoryCandidate] = [] - for match in matches: - candidate = MemoryCandidate( - type=match.mem_type, - title=match.title, - body=f"Line {match.line_num}: {match.detail}", - tags=list(base_tags), - source=source, - confidence=match.confidence, - ) - candidates.append(candidate) - - # Dedup against existing - if deduplicate and existing: - candidates = [c for c in candidates if not _is_duplicate(c, existing)] - - return candidates - - -# =========================================================================== -# Discussion / Transcript Capture -# =========================================================================== - -# Patterns for classifying discussion statements -_DISCUSSION_PATTERNS: list[tuple[str, str, str]] = [ - # Decisions - (r"(?:we\s+)?decided\s+(?:to\s+)?(.+)", "dec", "high"), - (r"(?:the\s+)?decision\s+is\s+(?:to\s+)?(.+)", "dec", "high"), - ( - r"(?:let'?s|we\s+should|we\s+will|we'?ll)\s+(?:go\s+with\s+|use\s+|adopt\s+)(.+)", - "dec", - "medium", - ), - (r"(?:I'?m\s+going\s+with|going\s+with|choosing)\s+(.+)", "dec", "medium"), - # Preferences - (r"I\s+prefer\s+(.+)", "pref", "high"), - (r"(?:let'?s|we\s+should)\s+(?:always|prefer|stick\s+with|keep)\s+(.+)", "pref", "medium"), - (r"(?:convention|standard|style):\s*(.+)", "pref", "medium"), - (r"(?:use|prefer)\s+(\S+)\s+(?:over|instead\s+of)\s+(\S+)", "pref", "medium"), - # Goals - (r"(?:the\s+)?goal\s+(?:is\s+)?(?:to\s+)?(.+)", "goal", "high"), - (r"we\s+(?:need|want|aim|plan)\s+to\s+(.+)", "goal", "medium"), - (r"(?:TODO|FIXME|HACK):\s*(.+)", "goal", "medium"), - (r"next\s+(?:step|priority|milestone):\s*(.+)", "goal", "medium"), - # Facts - (r"(?:it\s+)?turns?\s+out\s+(?:that\s+)?(.+)", "fact", "medium"), - (r"(?:TIL|FYI|note|important):\s*(.+)", "fact", "medium"), - ( - r"(?:the\s+)?(?:API|endpoint|service|server)\s+(?:is|uses|runs|supports)\s+(.+)", - "fact", - "medium", - ), - # Risks - (r"(?:risk|warning|careful|watch\s+out|danger):\s*(.+)", "risk", "high"), - (r"(?:this\s+)?(?:might|could|may)\s+(?:break|fail|cause)\s+(.+)", "risk", "medium"), - # Questions - (r"(?:should\s+we|do\s+we\s+need\s+to|how\s+(?:do|should)\s+we)\s+(.+)\??", "q", "medium"), - (r"(?:open\s+question|TBD|to\s+be\s+decided):\s*(.+)", "q", "medium"), -] - - -# Confidence ranking for tie-breaking -_CONFIDENCE_RANK = {"high": 2, "medium": 1, "low": 0} - - -def _classify_statement(text: str) -> tuple[str, str, str] | None: - """Classify a statement into a memory type. - - Evaluates all matching patterns and returns the highest-confidence - classification. Returns (type, extracted_title, confidence) or None - if no match. - """ - text_stripped = text.strip() - best: tuple[str, str, str] | None = None - best_rank = -1 - for pattern, mem_type, confidence in _DISCUSSION_PATTERNS: - m = re.search(pattern, text_stripped, re.IGNORECASE) - if m: - title = m.group(1).strip() - # Handle special case for "use X over Y" → "Prefer X over Y" - if mem_type == "pref" and len(m.groups()) >= 2: - title = f"{m.group(1)} over {m.group(2)}" - # Clean title - title = re.sub(r"\s+", " ", title) - title = title.rstrip(".") - if len(title) < 5: - continue - rank = _CONFIDENCE_RANK.get(confidence, 0) - if rank > best_rank: - best = (mem_type, title[:120], confidence) - best_rank = rank - return best - - -def capture_from_discussion( - workspace: Path, - transcript: str, - source: str = "discussion", - tags: list[str] | None = None, - deduplicate: bool = True, -) -> list[MemoryCandidate]: - """Extract memory candidates from a discussion transcript. - - Parses free-text conversation and identifies decisions, preferences, - goals, facts, risks, and open questions based on linguistic patterns. - - Parameters - ---------- - workspace - Path to the memory workspace (for dedup against existing). - transcript - Raw text of the discussion/conversation. - source - Source label for provenance (e.g. ``"slack:2026-02-10"``). - tags - Additional tags to apply to all candidates. - deduplicate - If True, filter out candidates that match existing memories. - - Returns - ------- - list[MemoryCandidate] - Candidate memories ready for review and optional insertion. - """ - base_tags = ["topic:discussion"] - if tags: - base_tags.extend(t for t in tags if t not in base_tags) - - # Load existing for dedup - existing: dict[str, Any] = {} - if deduplicate: - try: - existing = load_needs(workspace) - except (SystemExit, Exception): - existing = {} - - candidates: list[MemoryCandidate] = [] - seen_titles: set[str] = set() - - # Process line by line and also try multi-line sentences - lines = transcript.splitlines() - for line in lines: - line = line.strip() - if not line or len(line) < 10: - continue - - # Strip common prefixes: "> quote", "- list", "* list", "User:", timestamps - cleaned = re.sub(r"^(?:[>*\-]\s*|\d{1,2}:\d{2}\s*|[\w]+:\s*)", "", line).strip() - if not cleaned or len(cleaned) < 10: - continue - - result = _classify_statement(cleaned) - if result is None: - continue - - mem_type, title, confidence = result - - # Dedup within same transcript - title_lower = title.lower() - if title_lower in seen_titles: - continue - seen_titles.add(title_lower) - - candidate = MemoryCandidate( - type=mem_type, - title=title, - body=cleaned[:500], - tags=list(base_tags), - source=source, - confidence=confidence, - ) - candidates.append(candidate) - - # Dedup against existing - if deduplicate and existing: - candidates = [c for c in candidates if not _is_duplicate(c, existing)] - - return candidates diff --git a/src/ai_memory_protocol/cli.py b/src/ai_memory_protocol/cli.py index 31b2d2e..c37ef18 100644 --- a/src/ai_memory_protocol/cli.py +++ b/src/ai_memory_protocol/cli.py @@ -27,8 +27,7 @@ from pathlib import Path from . import __version__ -from .capture import capture_from_ci, capture_from_discussion, capture_from_git, format_candidates -from .config import TYPE_FILES +from .config import TYPE_DEFAULT_CONFIDENCE, TYPE_FILES from .engine import ( expand_graph, find_workspace, @@ -38,9 +37,7 @@ tag_match, text_match, ) -from .executor import actions_from_json, execute_plan from .formatter import format_brief, format_compact, format_context_pack, format_full -from .planner import format_plan, run_plan from .rst import ( add_tags_in_rst, append_to_rst, @@ -48,110 +45,12 @@ generate_id, generate_rst_directive, remove_tags_in_rst, + update_body_in_rst, update_field_in_rst, + update_title_in_rst, ) from .scaffold import init_workspace -# --------------------------------------------------------------------------- -# Doctor checks -# --------------------------------------------------------------------------- - - -def _check_cli() -> tuple[bool, str]: - """Verify CLI entry point works.""" - from . import __version__ - - return True, f"v{__version__}" - - -def _check_workspace(workspace_dir: str | None) -> tuple[bool, str]: - """Verify workspace exists and is valid.""" - try: - ws = find_workspace(workspace_dir) - return True, str(ws) - except SystemExit as e: - return False, f"{e} — Run: memory init " - - -def _check_sphinx_build(workspace_dir: str | None) -> tuple[bool, str]: - """Verify sphinx-build is discoverable.""" - from .engine import find_sphinx_build - - try: - ws = find_workspace(workspace_dir) - except SystemExit: - return False, "Workspace not found (skipped)" - try: - sb = find_sphinx_build(ws) - return True, sb - except FileNotFoundError as e: - return False, f"Not found — {e}" - - -def _check_needs_json(workspace_dir: str | None) -> tuple[bool, str]: - """Verify needs.json is loadable.""" - from .engine import find_needs_json - - try: - ws = find_workspace(workspace_dir) - except SystemExit: - return False, "Workspace not found (skipped)" - path = find_needs_json(ws) - if not path.exists(): - return False, f"Not found at {path} — Run: memory rebuild" - try: - needs = load_needs(ws) - return True, f"{len(needs)} memories loaded" - except (SystemExit, Exception) as e: - return False, f"Failed to load: {e}" - - -def _check_mcp_importable() -> tuple[bool, str]: - """Verify MCP SDK is installed.""" - try: - import mcp # noqa: F401 - - return True, f"v{getattr(mcp, '__version__', '?')}" - except ImportError: - return False, "Not installed — Run: pipx inject ai-memory-protocol mcp" - - -def _check_mcp_server() -> tuple[bool, str]: - """Verify MCP server can be created.""" - try: - from .mcp_server import create_mcp_server - - create_mcp_server() - return True, "Server created successfully" - except ImportError as e: - return False, f"MCP SDK missing: {e}" - except Exception as e: - return False, f"Failed: {e}" - - -def _check_rst_files(workspace_dir: str | None) -> tuple[bool, str]: - """Verify RST files exist and are parseable.""" - try: - ws = find_workspace(workspace_dir) - except SystemExit: - return False, "Workspace not found (skipped)" - memory_dir = ws / "memory" - if not memory_dir.exists(): - return False, f"No memory/ directory in {ws}" - rst_files = list(memory_dir.glob("*.rst")) - if not rst_files: - return False, "No RST files found in memory/" - errors = [] - for f in rst_files: - try: - f.read_text() - except Exception as e: - errors.append(f"{f.name}: {e}") - if errors: - return False, f"{len(errors)} unreadable files: {'; '.join(errors)}" - return True, f"{len(rst_files)} RST files OK" - - # --------------------------------------------------------------------------- # Subcommands # --------------------------------------------------------------------------- @@ -168,38 +67,6 @@ def cmd_init(args: argparse.Namespace) -> None: ) -def cmd_doctor(args: argparse.Namespace) -> None: - """Run installation health checks.""" - ws_dir = getattr(args, "dir", None) - checks = [ - ("CLI entry point", _check_cli), - ("Workspace exists", lambda: _check_workspace(ws_dir)), - ("Sphinx-build available", lambda: _check_sphinx_build(ws_dir)), - ("needs.json loadable", lambda: _check_needs_json(ws_dir)), - ("MCP SDK installed", _check_mcp_importable), - ("MCP server creatable", _check_mcp_server), - ("RST files parseable", lambda: _check_rst_files(ws_dir)), - ] - all_ok = True - print("AI Memory Protocol — Health Check\n") - for name, check_fn in checks: - try: - ok, detail = check_fn() - status = "\u2713" if ok else "\u2717" - print(f" {status} {name}: {detail}") - if not ok: - all_ok = False - except Exception as e: - print(f" \u2717 {name}: CRASH — {e}") - all_ok = False - print() - if all_ok: - print("All checks passed.") - else: - print("Some checks failed. See details above.") - sys.exit(1) - - def cmd_add(args: argparse.Namespace) -> None: """Add a new memory entry.""" workspace = find_workspace(args.dir) @@ -213,7 +80,7 @@ def cmd_add(args: argparse.Namespace) -> None: need_id=args.id, tags=tags, source=args.source, - confidence=args.confidence, + confidence=args.confidence or TYPE_DEFAULT_CONFIDENCE.get(args.type, "medium"), scope=args.scope, owner=args.owner, body=args.body, @@ -358,9 +225,20 @@ def cmd_update(args: argparse.Namespace) -> None: print(msg) any_change = any_change or ok + if getattr(args, "body", None): + ok, msg = update_body_in_rst(workspace, need_id, args.body) + print(msg) + any_change = any_change or ok + + if getattr(args, "title", None): + ok, msg = update_title_in_rst(workspace, need_id, args.title) + print(msg) + any_change = any_change or ok + if not any_change: print("No changes made. Specify at least one field to update.") - print(" --status, --confidence, --scope, --review-after, --add-tags, --remove-tags") + print(" --status, --confidence, --scope, --review-after, --source, --owner") + print(" --add-tags, --remove-tags, --body, --title") return print("Run 'memory rebuild' to update needs.json") @@ -475,173 +353,6 @@ def cmd_rebuild(args: argparse.Namespace) -> None: sys.exit(1) -def cmd_plan(args: argparse.Namespace) -> None: - """Analyze memory graph and generate a maintenance plan.""" - workspace = find_workspace(args.dir) - checks = [c.strip() for c in args.checks.split(",")] if args.checks else None - actions = run_plan(workspace, checks=checks) - fmt = args.format - print(format_plan(actions, fmt=fmt)) - - -def cmd_apply(args: argparse.Namespace) -> None: - """Execute a list of planned actions from a JSON file.""" - workspace = find_workspace(args.dir) - - if args.file: - import json as json_mod - - data = json_mod.loads(Path(args.file).read_text()) - actions = actions_from_json(data) - elif args.plan: - # Run plan first, then apply - checks = [c.strip() for c in args.plan.split(",")] if args.plan != "all" else None - actions_list = run_plan(workspace, checks=checks) - if not actions_list: - print("No issues found — nothing to apply.") - return - print(format_plan(actions_list, fmt="human")) - if not args.yes: - answer = input(f"\nApply {len(actions_list)} action(s)? [y/N] ") - if answer.lower() not in ("y", "yes"): - print("Aborted.") - return - actions = actions_list - else: - print("Provide --file or --plan [checks] to generate and apply.") - sys.exit(1) - - result = execute_plan( - workspace, - actions, - auto_commit=args.auto_commit, - rebuild=not args.no_rebuild, - ) - print(result.summary()) - if not result.success: - sys.exit(1) - - -def cmd_capture(args: argparse.Namespace) -> None: - """Capture memory candidates from external sources.""" - workspace = find_workspace(args.dir) - - if args.source == "git": - repo_path = Path(args.repo).resolve() if args.repo else Path.cwd() - candidates = capture_from_git( - workspace=workspace, - repo_path=repo_path, - since=args.since, - until=args.until, - repo_name=args.repo_name, - min_confidence=args.min_confidence, - ) - print(format_candidates(candidates, fmt=args.format)) - - if args.auto_add and candidates: - from .rst import append_to_rst, generate_rst_directive - - count = 0 - for c in candidates: - directive = generate_rst_directive( - mem_type=c.type, - title=c.title, - tags=c.tags, - source=c.source, - confidence=c.confidence, - scope=c.scope, - body=c.body, - ) - append_to_rst(workspace, c.type, directive) - count += 1 - print(f"\nAdded {count} memories to workspace.") - if not args.no_rebuild: - success, message = run_rebuild(workspace) - print(message) - elif args.source == "ci": - log_text = _read_capture_input(args.input) - if log_text is None: - print("Provide CI log via --input or pipe to stdin.") - sys.exit(1) - extra_tags = ( - [t.strip() for t in args.extra_tags.split(",") if t.strip()] - if args.extra_tags - else None - ) - candidates = capture_from_ci( - workspace=workspace, - log_text=log_text, - source=args.source_label or "ci-log", - tags=extra_tags, - ) - print(format_candidates(candidates, fmt=args.format)) - _auto_add_candidates(workspace, candidates, args) - elif args.source == "discussion": - transcript = _read_capture_input(args.input) - if transcript is None: - print("Provide transcript via --input or pipe to stdin.") - sys.exit(1) - extra_tags = ( - [t.strip() for t in args.extra_tags.split(",") if t.strip()] - if args.extra_tags - else None - ) - candidates = capture_from_discussion( - workspace=workspace, - transcript=transcript, - source=args.source_label or "discussion", - tags=extra_tags, - ) - print(format_candidates(candidates, fmt=args.format)) - _auto_add_candidates(workspace, candidates, args) - else: - print(f"Unknown capture source: {args.source}") - print("Supported sources: git, ci, discussion") - sys.exit(1) - - -def _read_capture_input(input_path: str | None) -> str | None: - """Read capture input from file, stdin, or return None.""" - if input_path: - path = Path(input_path) - if path.exists(): - return path.read_text() - print(f"File not found: {input_path}") - return None - if not sys.stdin.isatty(): - return sys.stdin.read() - return None - - -def _auto_add_candidates( - workspace: Path, - candidates: list, - args: argparse.Namespace, -) -> None: - """Add candidates to workspace if --auto-add flag is set.""" - if not getattr(args, "auto_add", False) or not candidates: - return - from .rst import append_to_rst, generate_rst_directive - - count = 0 - for c in candidates: - directive = generate_rst_directive( - mem_type=c.type, - title=c.title, - tags=c.tags, - source=c.source, - confidence=c.confidence, - scope=c.scope, - body=c.body, - ) - append_to_rst(workspace, c.type, directive) - count += 1 - print(f"\nAdded {count} memories to workspace.") - if not getattr(args, "no_rebuild", False): - success, message = run_rebuild(workspace) - print(message) - - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -755,7 +466,11 @@ def build_parser() -> argparse.ArgumentParser: p_add.add_argument("--id", help="Custom ID (auto-generated if omitted)") p_add.add_argument("--tags", help="Tags, comma-separated (use prefix:value format)") p_add.add_argument("--source", default="", help="Provenance (URL, commit, description)") - p_add.add_argument("--confidence", default="medium", choices=["low", "medium", "high"]) + p_add.add_argument( + "--confidence", + choices=["low", "medium", "high"], + help="Trust level (defaults by type: fact/dec/pref=high, mem/risk/goal=medium, q=low)", + ) p_add.add_argument("--scope", default="global", help="Scope: global, repo:X, product:X") p_add.add_argument("--owner", default="", help="Owner (@username)") p_add.add_argument("--body", default="", help="Description text") @@ -825,6 +540,8 @@ def build_parser() -> argparse.ArgumentParser: p_update.add_argument("--owner", help="New owner") p_update.add_argument("--add-tags", help="Tags to add, comma-separated") p_update.add_argument("--remove-tags", help="Tags to remove, comma-separated") + p_update.add_argument("--body", help="New body text (replaces entire description)") + p_update.add_argument("--title", help="New title (replaces directive title)") p_update.set_defaults(func=cmd_update) # --- deprecate --- @@ -850,118 +567,6 @@ def build_parser() -> argparse.ArgumentParser: p_rebuild = sub.add_parser("rebuild", help="Rebuild needs.json from RST sources") p_rebuild.set_defaults(func=cmd_rebuild) - # --- doctor --- - p_doctor = sub.add_parser("doctor", help="Run installation health checks") - p_doctor.set_defaults(func=cmd_doctor) - - # --- plan --- - p_plan = sub.add_parser("plan", help="Analyze memory graph and generate maintenance plan") - p_plan.add_argument( - "--checks", - help=( - "Comma-separated checks to run. " - "Options: duplicates, missing_tags, stale, conflicts, tag_normalize, split_files. " - "Default: all." - ), - ) - p_plan.add_argument( - "--format", - "-f", - choices=["human", "json"], - default="human", - help="Output format (default: human)", - ) - p_plan.set_defaults(func=cmd_plan) - - # --- apply --- - p_apply = sub.add_parser("apply", help="Execute planned maintenance actions") - p_apply.add_argument("--file", help="JSON file containing actions to apply") - p_apply.add_argument( - "--plan", - nargs="?", - const="all", - help="Run plan first, then apply. Optionally specify checks (comma-separated).", - ) - p_apply.add_argument( - "--auto-commit", - action="store_true", - help="Commit changes to git after successful apply", - ) - p_apply.add_argument( - "--no-rebuild", - action="store_true", - help="Skip Sphinx rebuild after applying", - ) - p_apply.add_argument( - "-y", - "--yes", - action="store_true", - help="Skip confirmation prompt when using --plan", - ) - p_apply.set_defaults(func=cmd_apply) - - # --- capture --- - p_capture = sub.add_parser("capture", help="Capture memories from external sources") - p_capture.add_argument( - "source", - choices=["git", "ci", "discussion"], - help="Capture source type", - ) - p_capture.add_argument( - "--repo", - help="Path to git repository (default: current directory, git only)", - ) - p_capture.add_argument( - "--repo-name", - help="Repository name for repo: tags (auto-detected from path if omitted, git only)", - ) - p_capture.add_argument( - "--since", - default="HEAD~20", - help="Start of git range (commit ref or date like '2 weeks ago'). Default: HEAD~20", - ) - p_capture.add_argument( - "--until", - default="HEAD", - help="End of git range. Default: HEAD", - ) - p_capture.add_argument( - "--min-confidence", - choices=["low", "medium", "high"], - default="low", - help="Minimum confidence to include (default: low)", - ) - p_capture.add_argument( - "--format", - "-f", - choices=["human", "json"], - default="human", - help="Output format (default: human)", - ) - p_capture.add_argument( - "--auto-add", - action="store_true", - help="Automatically add candidates to workspace (skip review)", - ) - p_capture.add_argument( - "--no-rebuild", - action="store_true", - help="Skip rebuild after auto-add", - ) - p_capture.add_argument( - "--input", - help="Input file for ci/discussion capture (reads stdin if omitted and not a TTY)", - ) - p_capture.add_argument( - "--source-label", - help="Source provenance label (e.g. 'ci:github-actions:run-123', 'slack:2026-02-10')", - ) - p_capture.add_argument( - "--extra-tags", - help="Extra tags for ci/discussion candidates, comma-separated", - ) - p_capture.set_defaults(func=cmd_capture) - return parser diff --git a/src/ai_memory_protocol/config.py b/src/ai_memory_protocol/config.py index 331f1b5..72eece7 100644 --- a/src/ai_memory_protocol/config.py +++ b/src/ai_memory_protocol/config.py @@ -46,6 +46,16 @@ "q": "active", } +TYPE_DEFAULT_CONFIDENCE: dict[str, str] = { + "mem": "medium", + "dec": "high", + "fact": "high", + "pref": "high", + "risk": "medium", + "goal": "medium", + "q": "low", +} + LINK_FIELDS: list[str] = [ "relates", "supports", diff --git a/src/ai_memory_protocol/executor.py b/src/ai_memory_protocol/executor.py deleted file mode 100644 index a71b5dc..0000000 --- a/src/ai_memory_protocol/executor.py +++ /dev/null @@ -1,448 +0,0 @@ -"""Execute planned maintenance actions against the memory workspace. - -The executor takes a list of ``Action`` objects (from ``planner.py``) -and applies them sequentially using existing ``rst.py`` functions. -Includes git-based rollback on build failure. - -Usage: - from ai_memory_protocol.executor import execute_plan - result = execute_plan(workspace, actions, auto_commit=False) -""" - -from __future__ import annotations - -import subprocess -from dataclasses import dataclass, field -from pathlib import Path -from typing import Any - -from .engine import run_rebuild -from .planner import Action -from .rst import ( - add_tags_in_rst, - append_to_rst, - deprecate_in_rst, - generate_rst_directive, - remove_tags_in_rst, - update_field_in_rst, -) - -# --------------------------------------------------------------------------- -# Result type -# --------------------------------------------------------------------------- - - -@dataclass -class ExecutionResult: - """Result of executing a plan.""" - - success: bool - applied: list[dict[str, Any]] = field(default_factory=list) - failed: list[dict[str, Any]] = field(default_factory=list) - skipped: list[dict[str, Any]] = field(default_factory=list) - build_output: str = "" - message: str = "" - - def to_dict(self) -> dict[str, Any]: - return { - "success": self.success, - "applied_count": len(self.applied), - "failed_count": len(self.failed), - "skipped_count": len(self.skipped), - "applied": self.applied, - "failed": self.failed, - "skipped": self.skipped, - "build_output": self.build_output, - "message": self.message, - } - - def summary(self) -> str: - parts = [self.message] if self.message else [] - parts.append( - f"Applied: {len(self.applied)}, " - f"Failed: {len(self.failed)}, " - f"Skipped: {len(self.skipped)}" - ) - if self.build_output: - parts.append(f"Build: {self.build_output[:200]}") - return "\n".join(parts) - - -# --------------------------------------------------------------------------- -# Validation -# --------------------------------------------------------------------------- - - -def validate_actions(actions: list[Action]) -> tuple[list[Action], list[dict[str, Any]]]: - """Validate actions before execution. - - Returns (valid_actions, skipped_with_reasons). - Checks: - - Circular supersedes (A supersedes B, B supersedes A) - - Missing required fields per action kind - """ - valid: list[Action] = [] - skipped: list[dict[str, Any]] = [] - - # Build supersede graph for cycle detection - supersede_map: dict[str, str] = {} - for a in actions: - if a.kind == "SUPERSEDE" and a.old_id and a.by_id: - supersede_map[a.old_id] = a.by_id - - for a in actions: - # Check required fields - if a.kind == "RETAG" and not a.id: - skipped.append({"action": a.to_dict(), "reason": "RETAG requires 'id'"}) - continue - if a.kind == "SUPERSEDE" and not a.old_id: - skipped.append({"action": a.to_dict(), "reason": "SUPERSEDE requires 'old_id'"}) - continue - if a.kind == "DEPRECATE" and not a.id: - skipped.append({"action": a.to_dict(), "reason": "DEPRECATE requires 'id'"}) - continue - if a.kind == "UPDATE" and not a.id: - skipped.append({"action": a.to_dict(), "reason": "UPDATE requires 'id'"}) - continue - if a.kind == "SPLIT_FILE" and not a.rst_path: - skipped.append({"action": a.to_dict(), "reason": "SPLIT_FILE requires 'rst_path'"}) - continue - - # Check supersede cycles - if a.kind == "SUPERSEDE" and a.old_id: - visited: set[str] = set() - current = a.old_id - cycle = False - while current in supersede_map: - if current in visited: - cycle = True - break - visited.add(current) - current = supersede_map[current] - if cycle: - skipped.append( - { - "action": a.to_dict(), - "reason": f"Circular supersede chain involving {a.old_id}", - } - ) - continue - - valid.append(a) - - return valid, skipped - - -# --------------------------------------------------------------------------- -# Individual action executors -# --------------------------------------------------------------------------- - - -def _execute_retag(workspace: Path, action: Action) -> tuple[bool, str]: - """Execute a RETAG action — add/remove tags on a memory.""" - messages: list[str] = [] - ok = True - - if action.remove_tags: - success, msg = remove_tags_in_rst(workspace, action.id, action.remove_tags) - messages.append(msg) - ok = ok and success - - if action.add_tags: - success, msg = add_tags_in_rst(workspace, action.id, action.add_tags) - messages.append(msg) - ok = ok and success - - return ok, "; ".join(messages) - - -def _execute_supersede(workspace: Path, action: Action) -> tuple[bool, str]: - """Execute a SUPERSEDE action — deprecate old, optionally create new.""" - messages: list[str] = [] - - # Deprecate the old memory - ok, msg = deprecate_in_rst(workspace, action.old_id, action.by_id) - messages.append(msg) - - if not ok: - return False, f"Failed to deprecate {action.old_id}: {msg}" - - # If new memory details provided, create it - if action.new_type and action.new_title: - directive = generate_rst_directive( - mem_type=action.new_type, - title=action.new_title, - tags=action.new_tags or [], - body=action.new_body or "", - supersedes=[action.old_id], - ) - target = append_to_rst(workspace, action.new_type, directive) - messages.append(f"Created replacement in {target.name}") - - return True, "; ".join(messages) - - -def _execute_deprecate(workspace: Path, action: Action) -> tuple[bool, str]: - """Execute a DEPRECATE action.""" - return deprecate_in_rst(workspace, action.id, action.by_id or None) - - -def _execute_update(workspace: Path, action: Action) -> tuple[bool, str]: - """Execute an UPDATE action — change metadata fields.""" - if not action.field_changes: - return True, f"No field changes for {action.id}" - - messages: list[str] = [] - all_ok = True - - for field_name, value in action.field_changes.items(): - ok, msg = update_field_in_rst(workspace, action.id, field_name, value) - messages.append(msg) - all_ok = all_ok and ok - - return all_ok, "; ".join(messages) - - -def _execute_prune(workspace: Path, action: Action) -> tuple[bool, str]: - """Execute a PRUNE action — deprecate without replacement.""" - return deprecate_in_rst(workspace, action.id) - - -def _execute_split_file(workspace: Path, action: Action) -> tuple[bool, str]: # noqa: ARG001 - """Execute a SPLIT_FILE action. - - This is informational — actual splitting happens automatically - via rst.py append_to_rst when MAX_ENTRIES_PER_FILE is exceeded. - """ - return True, ( - f"File splitting noted for {action.rst_path} — handled automatically on next append." - ) - - -# Dispatcher -_EXECUTORS = { - "RETAG": _execute_retag, - "SUPERSEDE": _execute_supersede, - "DEPRECATE": _execute_deprecate, - "UPDATE": _execute_update, - "PRUNE": _execute_prune, - "SPLIT_FILE": _execute_split_file, -} - - -# --------------------------------------------------------------------------- -# Git operations for rollback -# --------------------------------------------------------------------------- - - -def _git_stash_push(workspace: Path) -> bool: - """Stash uncommitted changes for rollback. Returns True if stash was created.""" - try: - result = subprocess.run( - ["git", "stash", "push", "-m", "memory_apply pre-backup"], - cwd=str(workspace), - capture_output=True, - text=True, - ) - # Only treat as successful if git exited cleanly. - if result.returncode != 0: - return False - # "No local changes to save" means nothing was stashed. - # This may appear in stdout or stderr. - output = (result.stdout or "") + (result.stderr or "") - return "No local changes to save" not in output - except OSError: - return False - - -def _git_stash_pop(workspace: Path) -> bool: - """Pop stashed changes to rollback.""" - try: - result = subprocess.run( - ["git", "stash", "pop"], - cwd=str(workspace), - capture_output=True, - text=True, - ) - return result.returncode == 0 - except OSError: - return False - - -def _git_stash_drop(workspace: Path) -> bool: - """Drop the stash (cleanup after successful apply).""" - try: - result = subprocess.run( - ["git", "stash", "drop"], - cwd=str(workspace), - capture_output=True, - text=True, - ) - return result.returncode == 0 - except OSError: - return False - - -def _git_commit(workspace: Path, message: str) -> bool: - """Stage and commit memory changes.""" - try: - subprocess.run( - ["git", "add", "memory/", "*.rst"], - cwd=str(workspace), - capture_output=True, - text=True, - ) - result = subprocess.run( - ["git", "commit", "-m", message], - cwd=str(workspace), - capture_output=True, - text=True, - ) - return result.returncode == 0 - except OSError: - return False - - -# --------------------------------------------------------------------------- -# Main execution entry point -# --------------------------------------------------------------------------- - - -def execute_plan( - workspace: Path, - actions: list[Action], - auto_commit: bool = False, - rebuild: bool = True, -) -> ExecutionResult: - """Execute a list of planned actions. - - Parameters - ---------- - workspace - Path to the memory workspace. - actions - Actions to execute (from ``run_plan`` or deserialized from JSON). - auto_commit - If True, commit changes to git after successful execution. - rebuild - If True, run Sphinx rebuild after applying actions. - - Returns - ------- - ExecutionResult - Summary of applied/failed/skipped actions + build output. - """ - # Validate - valid_actions, skipped = validate_actions(actions) - - if not valid_actions: - return ExecutionResult( - success=True, - skipped=skipped, - message="No valid actions to execute.", - ) - - # Stash for rollback - stashed = _git_stash_push(workspace) - - # Execute sequentially - applied: list[dict[str, Any]] = [] - failed: list[dict[str, Any]] = [] - - for action in valid_actions: - executor = _EXECUTORS.get(action.kind) - if not executor: - failed.append( - { - "action": action.to_dict(), - "error": f"Unknown action kind: {action.kind}", - } - ) - continue - - try: - ok, msg = executor(workspace, action) - entry = {"action": action.to_dict(), "message": msg} - if ok: - applied.append(entry) - else: - failed.append({**entry, "error": msg}) - except Exception as e: - failed.append({"action": action.to_dict(), "error": str(e)}) - - # Rebuild - build_output = "" - build_ok = True - if rebuild and applied: - build_ok, build_output = run_rebuild(workspace) - - # If build failed, always treat as unsuccessful; use git stash for rollback - # when available. - if not build_ok: - if stashed: - _git_stash_pop(workspace) - applied_result: list[dict[str, Any]] = [] - message = "Build failed after applying actions — rolled back via git stash pop." - else: - # No stash available: cannot automatically roll back workspace changes. - applied_result = applied - message = ( - "Build failed after applying actions — no git stash available for " - "rollback; workspace may be in an inconsistent state." - ) - - return ExecutionResult( - success=False, - applied=applied_result, - failed=failed, - skipped=skipped, - build_output=build_output, - message=message, - ) - - # Cleanup stash on success - if stashed: - _git_stash_drop(workspace) - - # Auto-commit - if auto_commit and applied: - kinds = set(a.get("action", {}).get("kind", "?") for a in applied) - msg = f"memory: auto-apply {', '.join(sorted(kinds))} ({len(applied)} actions)" - _git_commit(workspace, msg) - - all_succeeded = not failed - return ExecutionResult( - success=all_succeeded, - applied=applied, - failed=failed, - skipped=skipped, - build_output=build_output, - message=( - f"Plan executed: {len(applied)} applied, {len(failed)} failed, {len(skipped)} skipped." - ), - ) - - -def actions_from_json(data: list[dict[str, Any]]) -> list[Action]: - """Deserialize a list of action dicts (e.g. from JSON) into Action objects.""" - actions: list[Action] = [] - for d in data: - actions.append( - Action( - kind=d.get("kind", "UPDATE"), - reason=d.get("reason", ""), - id=d.get("id", ""), - add_tags=d.get("add_tags", []), - remove_tags=d.get("remove_tags", []), - field_changes=d.get("field_changes", {}), - old_id=d.get("old_id", ""), - new_type=d.get("new_type", ""), - new_title=d.get("new_title", ""), - new_body=d.get("new_body", ""), - new_tags=d.get("new_tags", []), - new_links=d.get("new_links", []), - by_id=d.get("by_id", ""), - rst_path=d.get("rst_path", ""), - ) - ) - return actions diff --git a/src/ai_memory_protocol/mcp_server.py b/src/ai_memory_protocol/mcp_server.py index 4444764..bff3a3b 100644 --- a/src/ai_memory_protocol/mcp_server.py +++ b/src/ai_memory_protocol/mcp_server.py @@ -32,6 +32,7 @@ TextContent = None # type: ignore[assignment,misc] Tool = None # type: ignore[assignment,misc] +from .config import TYPE_DEFAULT_CONFIDENCE # noqa: E402 from .engine import ( # noqa: E402 expand_graph, find_workspace, @@ -54,7 +55,9 @@ generate_id, generate_rst_directive, remove_tags_in_rst, + update_body_in_rst, update_field_in_rst, + update_title_in_rst, ) logger = logging.getLogger(__name__) @@ -217,9 +220,13 @@ def _build_tools() -> list: }, "confidence": { "type": "string", - "description": "Trust level.", + "description": ( + "Trust level: high = verified across 2+ sessions or confirmed by user; " + "medium = observed once, not yet verified; " + "low = hypothesis or guess. " + "Defaults vary by type: fact/dec/pref=high, mem/risk/goal=medium, q=low." + ), "enum": ["low", "medium", "high"], - "default": "medium", }, "source": { "type": "string", @@ -232,16 +239,31 @@ def _build_tools() -> list: }, "relates": { "type": "string", - "description": "Comma-separated IDs of related memories.", + "description": ( + "Comma-separated IDs of related memories. " + "Use only when no specific link type fits. " + "Prefer: supersedes (replaces), depends (requires), " + "supports (evidence for), contradicts (disagrees with), " + "example_of (concrete case of)." + ), }, "supersedes": { "type": "string", "description": "Comma-separated IDs that this memory supersedes.", }, + "owner": { + "type": "string", + "description": "Owner (@username).", + }, "id": { "type": "string", "description": "Custom memory ID. Auto-generated from type + title if omitted.", }, + "review_days": { + "type": "integer", + "description": "Days until review is due. Default 30.", + "default": 30, + }, "rebuild": { "type": "boolean", "description": "Auto-rebuild needs.json after adding. Default true.", @@ -254,8 +276,9 @@ def _build_tools() -> list: Tool( name="memory_update", description=( - "Update metadata on an existing memory. " - "Can change status, confidence, scope, tags, review date, etc." + "Update an existing memory's content or metadata. " + "Can change title, body text, status, confidence, scope, " + "tags, review date, and owner." ), inputSchema={ "type": "object", @@ -294,6 +317,18 @@ def _build_tools() -> list: "type": "string", "description": "Tags to remove, comma-separated.", }, + "body": { + "type": "string", + "description": "New body text. Replaces the entire description.", + }, + "title": { + "type": "string", + "description": "New title. Replaces the directive title.", + }, + "owner": { + "type": "string", + "description": "New owner for this memory.", + }, }, "required": ["id"], }, @@ -360,191 +395,6 @@ def _build_tools() -> list: "required": [], }, ), - Tool( - name="memory_plan", - description=( - "Analyze memory graph and return planned maintenance actions (no modifications). " - "Checks for duplicates, missing tags, stale entries, conflicts, tag normalization, " - "and oversized files. Returns a list of proposed actions." - ), - inputSchema={ - "type": "object", - "properties": { - "checks": { - "type": "array", - "items": { - "type": "string", - "enum": [ - "duplicates", - "missing_tags", - "stale", - "conflicts", - "tag_normalize", - "split_files", - "auto_summaries", - ], - }, - "description": "Which checks to run. Default: all.", - }, - "format": { - "type": "string", - "enum": ["human", "json"], - "default": "human", - "description": "Output format. 'json' for machine-readable actions.", - }, - }, - "required": [], - }, - ), - Tool( - name="memory_apply", - description=( - "Execute a list of planned memory actions, rebuild, and validate. " - "Includes git-based rollback on build failure. Pass actions from memory_plan output." - ), - inputSchema={ - "type": "object", - "properties": { - "actions": { - "type": "array", - "items": {"type": "object"}, - "description": "Actions from memory_plan output (JSON format).", - }, - "auto_commit": { - "type": "boolean", - "default": False, - "description": "Commit changes to git after successful apply.", - }, - }, - "required": ["actions"], - }, - ), - Tool( - name="memory_capture_git", - description=( - "Analyze git log and generate memory candidates from commit history. " - "Classifies commits by conventional commit prefix " - "and deduplicates against existing memories." - ), - inputSchema={ - "type": "object", - "properties": { - "repo_path": { - "type": "string", - "description": "Path to git repository. Default: current directory.", - }, - "since": { - "type": "string", - "default": "HEAD~20", - "description": "Start of range (commit ref or date like '2 weeks ago').", - }, - "until": { - "type": "string", - "default": "HEAD", - "description": "End of range.", - }, - "repo_name": { - "type": "string", - "description": "Repository name for repo: tags. Auto-detected if omitted.", - }, - "min_confidence": { - "type": "string", - "enum": ["low", "medium", "high"], - "default": "low", - "description": "Minimum confidence to include.", - }, - "format": { - "type": "string", - "enum": ["human", "json"], - "default": "human", - "description": "Output format.", - }, - "auto_add": { - "type": "boolean", - "default": False, - "description": "Automatically add candidates to workspace.", - }, - }, - "required": [], - }, - ), - Tool( - name="memory_capture_ci", - description=( - "Extract memory candidates from CI log output (test failures, build errors, " - "deprecation warnings, timeouts). Parses common CI patterns and generates " - "structured memories. Pass log text directly or reference a log file." - ), - inputSchema={ - "type": "object", - "properties": { - "log_text": { - "type": "string", - "description": "Raw CI log text to parse.", - }, - "source": { - "type": "string", - "default": "ci-log", - "description": "Source label (e.g. 'ci:github-actions:run-123').", - }, - "tags": { - "type": "string", - "description": "Extra tags, comma-separated. topic:ci is auto-added.", - }, - "format": { - "type": "string", - "enum": ["human", "json"], - "default": "human", - "description": "Output format.", - }, - "auto_add": { - "type": "boolean", - "default": False, - "description": "Automatically add candidates to workspace.", - }, - }, - "required": ["log_text"], - }, - ), - Tool( - name="memory_capture_discussion", - description=( - "Extract memory candidates from a discussion or conversation transcript. " - "Identifies decisions, preferences, goals, facts, risks, and open questions " - "from natural language patterns like 'we decided to...', 'I prefer...', " - "'the goal is...', 'should we...'." - ), - inputSchema={ - "type": "object", - "properties": { - "transcript": { - "type": "string", - "description": "Raw text of the discussion/conversation.", - }, - "source": { - "type": "string", - "default": "discussion", - "description": "Source label (e.g. 'slack:2026-02-10', 'meeting:standup').", - }, - "tags": { - "type": "string", - "description": "Extra tags, comma-separated. topic:discussion is auto-added.", - }, - "format": { - "type": "string", - "enum": ["human", "json"], - "default": "human", - "description": "Output format.", - }, - "auto_add": { - "type": "boolean", - "default": False, - "description": "Automatically add candidates to workspace.", - }, - }, - "required": ["transcript"], - }, - ), ] @@ -666,16 +516,6 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: return _handle_stale(arguments) elif name == "memory_rebuild": return _handle_rebuild(arguments) - elif name == "memory_plan": - return _handle_plan(arguments) - elif name == "memory_apply": - return _handle_apply(arguments) - elif name == "memory_capture_git": - return _handle_capture_git(arguments) - elif name == "memory_capture_ci": - return _handle_capture_ci(arguments) - elif name == "memory_capture_discussion": - return _handle_capture_discussion(arguments) else: return _text_response(f"Unknown tool: {name}") except SystemExit as e: @@ -772,11 +612,13 @@ def _handle_add(args: dict[str, Any]) -> list[TextContent]: need_id=args.get("id"), tags=tags, source=args.get("source", ""), - confidence=args.get("confidence", "medium"), + confidence=args.get("confidence") or TYPE_DEFAULT_CONFIDENCE.get(args["type"], "medium"), scope=args.get("scope", "global"), + owner=args.get("owner", ""), body=args.get("body", ""), relates=relates, supersedes=supersedes, + review_days=args.get("review_days", 30), ) target = append_to_rst(workspace, args["type"], directive) @@ -800,7 +642,7 @@ def _handle_update(args: dict[str, Any]) -> list[TextContent]: messages: list[str] = [] any_change = False - for field in ("status", "confidence", "scope", "review_after", "source"): + for field in ("status", "confidence", "scope", "review_after", "source", "owner"): value = args.get(field) if value is not None: ok, msg = update_field_in_rst(workspace, need_id, field, value) @@ -819,6 +661,16 @@ def _handle_update(args: dict[str, Any]) -> list[TextContent]: messages.append(msg) any_change = any_change or ok + if args.get("body"): + ok, msg = update_body_in_rst(workspace, need_id, args["body"]) + messages.append(msg) + any_change = any_change or ok + + if args.get("title"): + ok, msg = update_title_in_rst(workspace, need_id, args["title"]) + messages.append(msg) + any_change = any_change or ok + if not any_change: return _text_response("No changes made. Specify at least one field to update.") @@ -911,150 +763,6 @@ def _handle_rebuild(args: dict[str, Any]) -> list[TextContent]: return _text_response(result) -def _handle_plan(args: dict[str, Any]) -> list[TextContent]: - from .planner import format_plan, run_plan - - workspace = _get_workspace() - checks = args.get("checks") # list[str] or None - fmt = args.get("format", "human") - actions = run_plan(workspace, checks=checks) - output = format_plan(actions, fmt=fmt) - return _text_response(output) - - -def _handle_apply(args: dict[str, Any]) -> list[TextContent]: - from .executor import actions_from_json, execute_plan - - workspace = _get_workspace() - raw_actions = args.get("actions", []) - auto_commit = args.get("auto_commit", False) - actions = actions_from_json(raw_actions) - result = execute_plan(workspace, actions, auto_commit=auto_commit) - return _text_response(result.summary()) - - -def _handle_capture_git(args: dict[str, Any]) -> list[TextContent]: - from .capture import capture_from_git, format_candidates - from .rst import append_to_rst, generate_rst_directive - - workspace = _get_workspace() - repo_path = Path(args.get("repo_path", ".")).resolve() - candidates = capture_from_git( - workspace=workspace, - repo_path=repo_path, - since=args.get("since", "HEAD~20"), - until=args.get("until", "HEAD"), - repo_name=args.get("repo_name"), - min_confidence=args.get("min_confidence", "low"), - ) - - output_lines: list[str] = [] - fmt = args.get("format", "human") - output_lines.append(format_candidates(candidates, fmt=fmt)) - - if args.get("auto_add", False) and candidates: - count = 0 - for c in candidates: - directive = generate_rst_directive( - mem_type=c.type, - title=c.title, - tags=c.tags, - source=c.source, - confidence=c.confidence, - scope=c.scope, - body=c.body, - ) - append_to_rst(workspace, c.type, directive) - count += 1 - output_lines.append(f"\nAdded {count} memories to workspace.") - success, msg = run_rebuild(workspace) - output_lines.append(msg) - - return _text_response("\n".join(output_lines)) - - -def _handle_capture_ci(args: dict[str, Any]) -> list[TextContent]: - from .capture import capture_from_ci, format_candidates - from .rst import append_to_rst, generate_rst_directive - - workspace = _get_workspace() - log_text = args.get("log_text", "") - extra_tags = ( - [t.strip() for t in args["tags"].split(",") if t.strip()] if args.get("tags") else None - ) - candidates = capture_from_ci( - workspace=workspace, - log_text=log_text, - source=args.get("source", "ci-log"), - tags=extra_tags, - ) - - output_lines: list[str] = [] - fmt = args.get("format", "human") - output_lines.append(format_candidates(candidates, fmt=fmt)) - - if args.get("auto_add", False) and candidates: - count = 0 - for c in candidates: - directive = generate_rst_directive( - mem_type=c.type, - title=c.title, - tags=c.tags, - source=c.source, - confidence=c.confidence, - scope=c.scope, - body=c.body, - ) - append_to_rst(workspace, c.type, directive) - count += 1 - output_lines.append(f"\nAdded {count} memories to workspace.") - success, msg = run_rebuild(workspace) - output_lines.append(msg) - - return _text_response("\n".join(output_lines)) - - -def _handle_capture_discussion(args: dict[str, Any]) -> list[TextContent]: - from .capture import capture_from_discussion, format_candidates - from .rst import append_to_rst, generate_rst_directive - - workspace = _get_workspace() - transcript = args.get("transcript", "") - extra_tags = ( - [t.strip() for t in args["tags"].split(",") if t.strip()] if args.get("tags") else None - ) - candidates = capture_from_discussion( - workspace=workspace, - transcript=transcript, - source=args.get("source", "discussion"), - tags=extra_tags, - ) - - output_lines: list[str] = [] - fmt = args.get("format", "human") - output_lines.append(format_candidates(candidates, fmt=fmt)) - - if args.get("auto_add", False) and candidates: - count = 0 - for c in candidates: - directive = generate_rst_directive( - mem_type=c.type, - title=c.title, - tags=c.tags, - source=c.source, - confidence=c.confidence, - scope=c.scope, - body=c.body, - ) - append_to_rst(workspace, c.type, directive) - count += 1 - output_lines.append(f"\nAdded {count} memories to workspace.") - success, msg = run_rebuild(workspace) - output_lines.append(msg) - - return _text_response("\n".join(output_lines)) - - # --------------------------------------------------------------------------- # Entry points # --------------------------------------------------------------------------- diff --git a/src/ai_memory_protocol/planner.py b/src/ai_memory_protocol/planner.py deleted file mode 100644 index 95ce2f3..0000000 --- a/src/ai_memory_protocol/planner.py +++ /dev/null @@ -1,490 +0,0 @@ -"""Plan analysis — detect problems and generate maintenance actions. - -The planner is **read-only**: it loads needs.json + RST files, runs -detection algorithms, and returns a list of ``Action`` dicts describing -what *should* be done. The executor (``executor.py``) is responsible -for actually applying actions. - -Usage: - from ai_memory_protocol.planner import run_plan - actions = run_plan(workspace, checks=["duplicates", "missing_tags"]) -""" - -from __future__ import annotations - -from collections import defaultdict -from dataclasses import asdict, dataclass, field -from datetime import date, timedelta -from difflib import SequenceMatcher -from pathlib import Path -from typing import Any, Literal - -from .config import TYPE_FILES -from .engine import load_needs -from .rst import MAX_ENTRIES_PER_FILE, _count_entries, _find_all_rst_files - -# --------------------------------------------------------------------------- -# Action types -# --------------------------------------------------------------------------- - -ActionKind = Literal["RETAG", "SUPERSEDE", "DEPRECATE", "UPDATE", "PRUNE", "SPLIT_FILE"] - -ALL_CHECKS: list[str] = [ - "duplicates", - "missing_tags", - "stale", - "conflicts", - "tag_normalize", - "split_files", - "auto_summaries", -] - - -@dataclass -class Action: - """A planned maintenance action.""" - - kind: ActionKind - reason: str - # Fields used by individual action kinds - id: str = "" - add_tags: list[str] = field(default_factory=list) - remove_tags: list[str] = field(default_factory=list) - field_changes: dict[str, str] = field(default_factory=dict) - # SUPERSEDE-specific - old_id: str = "" - new_type: str = "" - new_title: str = "" - new_body: str = "" - new_tags: list[str] = field(default_factory=list) - new_links: list[str] = field(default_factory=list) - # DEPRECATE-specific - by_id: str = "" - # SPLIT_FILE-specific - rst_path: str = "" - - def to_dict(self) -> dict[str, Any]: - """Serialize to a plain dict, omitting empty fields.""" - d = asdict(self) - return {k: v for k, v in d.items() if v} - - -# --------------------------------------------------------------------------- -# Detection algorithms -# --------------------------------------------------------------------------- - - -def _active_needs(needs: dict[str, Any]) -> dict[str, Any]: - """Filter to non-deprecated needs only.""" - return {k: v for k, v in needs.items() if v.get("status") != "deprecated"} - - -def detect_duplicates( - needs: dict[str, Any], - title_threshold: float = 0.8, - tag_overlap_threshold: float = 0.5, -) -> list[Action]: - """Find near-duplicate memories by title similarity + tag overlap. - - Complexity: O(n²) — acceptable for n < 500. - """ - active = _active_needs(needs) - items = list(active.items()) - actions: list[Action] = [] - seen_pairs: set[tuple[str, str]] = set() - - for i, (id1, n1) in enumerate(items): - for id2, n2 in items[i + 1 :]: - pair = tuple(sorted((id1, id2))) - if pair in seen_pairs: - continue - - title_sim = SequenceMatcher( - None, n1.get("title", "").lower(), n2.get("title", "").lower() - ).ratio() - if title_sim < title_threshold: - continue - - tags1 = set(n1.get("tags", [])) - tags2 = set(n2.get("tags", [])) - union = tags1 | tags2 - if not union: - continue - tag_overlap = len(tags1 & tags2) / len(union) - if tag_overlap < tag_overlap_threshold: - continue - - seen_pairs.add(pair) - - # Prefer newer + higher-confidence as canonical - conf_rank = {"high": 2, "medium": 1, "low": 0} - score1 = ( - conf_rank.get(n1.get("confidence", "medium"), 1), - n1.get("created_at", ""), - ) - score2 = ( - conf_rank.get(n2.get("confidence", "medium"), 1), - n2.get("created_at", ""), - ) - - if score2 > score1: - old_id, new_id = id1, id2 - else: - old_id, new_id = id2, id1 - - actions.append( - Action( - kind="SUPERSEDE", - reason=( - f"Near-duplicate: title similarity {title_sim:.0%}, " - f"tag overlap {tag_overlap:.0%}. " - f"Keep {new_id} (higher score), deprecate {old_id}." - ), - old_id=old_id, - by_id=new_id, - ) - ) - - return actions - - -def detect_missing_tags(needs: dict[str, Any]) -> list[Action]: - """Find memories without required tag prefixes (topic: or repo:). - - O(n) — checks every active need once. - """ - active = _active_needs(needs) - actions: list[Action] = [] - - for nid, need in active.items(): - tags = need.get("tags", []) - has_topic = any(t.startswith("topic:") for t in tags) - has_repo = any(t.startswith("repo:") for t in tags) - missing: list[str] = [] - if not has_topic: - missing.append("topic:") - if not has_repo: - missing.append("repo:") - if missing: - actions.append( - Action( - kind="RETAG", - reason=f"Missing required tag prefix(es): {', '.join(missing)}", - id=nid, - ) - ) - - return actions - - -def detect_stale(needs: dict[str, Any]) -> list[Action]: - """Find expired or review-overdue memories. - - O(n) — mirrors the logic in ``cmd_stale`` but returns actions. - """ - active = _active_needs(needs) - today = date.today().isoformat() - actions: list[Action] = [] - - for nid, need in active.items(): - expires = need.get("expires_at", "") - review = need.get("review_after", "") - - if expires and expires <= today: - actions.append( - Action( - kind="UPDATE", - reason=f"Expired on {expires} — needs review or deprecation.", - id=nid, - field_changes={"status": "review"}, - ) - ) - elif review and review <= today: - actions.append( - Action( - kind="UPDATE", - reason=f"Review overdue since {review}.", - id=nid, - field_changes={"status": "review"}, - ) - ) - - return actions - - -def detect_conflicts(needs: dict[str, Any]) -> list[Action]: - """Find active needs with same topic but no contradicts link. - - Heuristic: two decisions on the same topic:* tag with no explicit - relationship may indicate an unrecorded contradiction. - - O(n²) per topic group — practical for small graphs. - """ - active = _active_needs(needs) - # Group decisions by topic tag - by_topic: dict[str, list[str]] = defaultdict(list) - for nid, need in active.items(): - if need.get("type") not in ("dec", "pref"): - continue - for tag in need.get("tags", []): - if tag.startswith("topic:"): - by_topic[tag].append(nid) - - actions: list[Action] = [] - seen: set[tuple[str, str]] = set() - - for _topic, ids in by_topic.items(): - if len(ids) < 2: - continue - for i, id1 in enumerate(ids): - for id2 in ids[i + 1 :]: - pair = tuple(sorted((id1, id2))) - if pair in seen: - continue - seen.add(pair) - - n1, n2 = active[id1], active[id2] - # Check if they already have a relationship - links1 = set() - links2 = set() - for lt in ("relates", "supports", "depends", "contradicts", "supersedes"): - links1.update(n1.get(lt, [])) - links2.update(n2.get(lt, [])) - - if id2 not in links1 and id1 not in links2: - actions.append( - Action( - kind="UPDATE", - reason=( - f"Potential conflict: {id1} and {id2} are both " - f"active {n1.get('type')}/{n2.get('type')} entries on " - f"the same topic with no explicit relationship link." - ), - id=id1, - field_changes={"status": "review"}, - ) - ) - - return actions - - -def detect_tag_normalization(needs: dict[str, Any]) -> list[Action]: - """Find case-insensitive tag duplicates and normalize to most common form. - - O(n) scan + O(types per lowered tag). - """ - active = _active_needs(needs) - # Collect all tag forms with usage counts - tag_usage: dict[str, int] = defaultdict(int) # exact form -> count - for need in active.values(): - for tag in need.get("tags", []): - tag_usage[tag] += 1 - - # Group by lowercased form - lower_groups: dict[str, set[str]] = defaultdict(set) - for tag in tag_usage: - lower_groups[tag.lower()].add(tag) - - actions: list[Action] = [] - for forms in lower_groups.values(): - if len(forms) <= 1: - continue - # Pick the most common form as canonical - canonical = max(forms, key=lambda t: tag_usage[t]) - non_canonical = forms - {canonical} - for form in non_canonical: - for nid, need in active.items(): - if form in need.get("tags", []): - actions.append( - Action( - kind="RETAG", - reason=f"Tag normalization: '{form}' → '{canonical}'", - id=nid, - remove_tags=[form], - add_tags=[canonical], - ) - ) - - return actions - - -def detect_auto_summaries( - needs: dict[str, Any], - min_count: int = 5, - min_age_days: int = 60, -) -> list[Action]: - """Find topics with many aged observations that could be consolidated. - - When a single ``topic:`` tag has *min_count* or more ``mem`` entries - all older than *min_age_days*, propose consolidating them into a - single ``fact``. - - O(n) — one pass over active needs, then per-topic grouping. - """ - active = _active_needs(needs) - cutoff = (date.today() - timedelta(days=min_age_days)).isoformat() - - # Group observations by topic tag (use sets to avoid duplicate IDs) - by_topic: dict[str, set[str]] = defaultdict(set) - for nid, need in active.items(): - if need.get("type") != "mem": - continue - created = need.get("created_at", "") - if not created or created > cutoff: - continue - for tag in need.get("tags", []): - if tag.startswith("topic:"): - by_topic[tag].add(nid) - - actions: list[Action] = [] - for topic_tag, ids in by_topic.items(): - if len(ids) < min_count: - continue - topic_value = topic_tag.split(":", 1)[1] - # Collect all tags from the group for the consolidated entry - all_tags: set[str] = set() - for nid in ids: - all_tags.update(active[nid].get("tags", [])) - - sorted_ids = sorted(ids) - actions.append( - Action( - kind="SUPERSEDE", - reason=( - f"Auto-summary: {len(ids)} observations on {topic_tag} " - f"older than {min_age_days} days. Consider consolidating " - f"into a single fact." - ), - old_id=",".join(sorted_ids), - new_type="fact", - new_title=f"Consolidated: {topic_value} observations", - new_tags=sorted(all_tags), - ) - ) - - return actions - - -def detect_split_files(workspace: Path) -> list[Action]: - """Find RST files that exceed MAX_ENTRIES_PER_FILE. - - O(files) — scans each RST file once. - """ - actions: list[Action] = [] - - for mem_type in TYPE_FILES: - for rst_path in _find_all_rst_files(workspace, mem_type): - count = _count_entries(rst_path) - if count > MAX_ENTRIES_PER_FILE: - actions.append( - Action( - kind="SPLIT_FILE", - reason=( - f"{rst_path.name} has {count} entries (limit: {MAX_ENTRIES_PER_FILE})." - ), - rst_path=str(rst_path), - ) - ) - - return actions - - -# --------------------------------------------------------------------------- -# Public interface -# --------------------------------------------------------------------------- - -# Map check names to detector functions -_DETECTORS: dict[str, Any] = { - "duplicates": lambda needs, ws: detect_duplicates(needs), - "missing_tags": lambda needs, ws: detect_missing_tags(needs), - "stale": lambda needs, ws: detect_stale(needs), - "conflicts": lambda needs, ws: detect_conflicts(needs), - "tag_normalize": lambda needs, ws: detect_tag_normalization(needs), - "split_files": lambda needs, ws: detect_split_files(ws), - "auto_summaries": lambda needs, ws: detect_auto_summaries(needs), -} - - -def run_plan( - workspace: Path, - checks: list[str] | None = None, - needs: dict[str, Any] | None = None, -) -> list[Action]: - """Run selected (or all) checks and return a unified list of actions. - - Parameters - ---------- - workspace - Path to the memory workspace (containing conf.py, memory/, etc.). - checks - Which checks to run. Defaults to all checks. - needs - Pre-loaded needs dict. If ``None``, loads from workspace. - - Returns - ------- - list[Action] - Ordered list of planned actions — not yet executed. - """ - if needs is None: - needs = load_needs(workspace) - - selected = checks or ALL_CHECKS - all_actions: list[Action] = [] - - for check in selected: - detector = _DETECTORS.get(check) - if detector is None: - raise ValueError( - f"Unknown check {check!r} requested. Available checks: {sorted(_DETECTORS.keys())}" - ) - # split_files only needs workspace, others need needs dict - actions = detector(needs, workspace) - all_actions.extend(actions) - - return all_actions - - -def format_plan(actions: list[Action], fmt: str = "human") -> str: - """Render a plan as human-readable text or JSON. - - Parameters - ---------- - fmt - ``"human"`` for readable text, ``"json"`` for machine-readable. - """ - if not actions: - return "No issues found — memory graph looks healthy." - - if fmt == "json": - import json - - return json.dumps([a.to_dict() for a in actions], indent=2, ensure_ascii=False) - - lines: list[str] = [f"## Memory Maintenance Plan — {len(actions)} action(s)\n"] - - # Group by kind - by_kind: dict[str, list[Action]] = defaultdict(list) - for a in actions: - by_kind[a.kind].append(a) - - for kind in ("SUPERSEDE", "DEPRECATE", "RETAG", "UPDATE", "PRUNE", "SPLIT_FILE"): - group = by_kind.get(kind, []) - if not group: - continue - lines.append(f"### {kind} ({len(group)})\n") - for a in group: - target = a.id or a.old_id or a.rst_path - lines.append(f" - **{target}**: {a.reason}") - if a.add_tags: - lines.append(f" + add tags: {', '.join(a.add_tags)}") - if a.remove_tags: - lines.append(f" - remove tags: {', '.join(a.remove_tags)}") - if a.field_changes: - for k, v in a.field_changes.items(): - lines.append(f" ~ {k} → {v}") - if a.by_id: - lines.append(f" → superseded by: {a.by_id}") - lines.append("") - - return "\n".join(lines) diff --git a/src/ai_memory_protocol/rst.py b/src/ai_memory_protocol/rst.py index 2f2afaf..f5b36d7 100644 --- a/src/ai_memory_protocol/rst.py +++ b/src/ai_memory_protocol/rst.py @@ -222,6 +222,102 @@ def deprecate_in_rst( return success, msg +def update_body_in_rst( + workspace: Path, + need_id: str, + new_body: str, +) -> tuple[bool, str]: + """Replace the body text of a need in its RST source file.""" + for mem_type in TYPE_FILES: + for rst_path in _find_all_rst_files(workspace, mem_type): + if not rst_path.exists(): + continue + content = rst_path.read_text() + if f":id: {need_id}" not in content: + continue + + lines = content.split("\n") + id_line_idx = None + for i, line in enumerate(lines): + if f":id: {need_id}" in line: + id_line_idx = i + break + if id_line_idx is None: + return False, f"Found file but could not locate :id: {need_id}" + + # Find end of metadata (last line starting with whitespace + colon) + body_start = id_line_idx + 1 + while body_start < len(lines) and re.match(r"\s+:\w", lines[body_start]): + body_start += 1 + + # Skip blank line between metadata and body + if body_start < len(lines) and lines[body_start].strip() == "": + body_start += 1 + + # Find end of body (next blank line or next directive or EOF) + body_end = body_start + while body_end < len(lines): + line = lines[body_end] + if line.strip() == "" and body_end > body_start: + break + if re.match(r"\.\. \w+::", line): + break + body_end += 1 + + # Build new body lines + new_body_lines = [] + for bline in textwrap.fill(new_body, width=72).split("\n"): + new_body_lines.append(f" {bline}") + + # Replace + lines[body_start:body_end] = new_body_lines + rst_path.write_text("\n".join(lines)) + + # Update the updated_at field + update_field_in_rst(workspace, need_id, "updated_at", date.today().isoformat()) + + return True, f"Updated body on {need_id} in {rst_path.name}" + + return False, f"Memory '{need_id}' not found in any RST file." + + +def update_title_in_rst( + workspace: Path, + need_id: str, + new_title: str, +) -> tuple[bool, str]: + """Replace the title of a need in its RST source file.""" + new_title = new_title.replace("\n", " ").replace("\r", " ").strip() + if not new_title: + return False, "Title cannot be empty." + for mem_type in TYPE_FILES: + for rst_path in _find_all_rst_files(workspace, mem_type): + if not rst_path.exists(): + continue + content = rst_path.read_text() + if f":id: {need_id}" not in content: + continue + + lines = content.split("\n") + for i, line in enumerate(lines): + if f":id: {need_id}" in line: + # Directive line is 1-2 lines above :id: + for j in range(max(0, i - 3), i): + if re.match(r"\.\. \w+::", lines[j]): + directive_type = lines[j].split("::")[0].strip() + lines[j] = f"{directive_type}:: {new_title}" + rst_path.write_text("\n".join(lines)) + update_field_in_rst( + workspace, need_id, "updated_at", date.today().isoformat() + ) + return True, ( + f"Updated title on {need_id} to '{new_title}' in {rst_path.name}" + ) + return False, f"Could not find directive line for {need_id}" + + return False, f"Memory '{need_id}' not found in any RST file." + + def add_tags_in_rst( workspace: Path, need_id: str, diff --git a/src/ai_memory_protocol/scaffold.py b/src/ai_memory_protocol/scaffold.py index 21d0a8c..f7a660c 100644 --- a/src/ai_memory_protocol/scaffold.py +++ b/src/ai_memory_protocol/scaffold.py @@ -219,6 +219,21 @@ def _write(path: Path, content: str) -> None: needs_build_json = True needs_id_regex = r"^[A-Za-z0-9_-]+" needs_role_need_template = "[{{title}}]({{id}})" + +# -------------------------------------------------------------------------- +# 6. Quality gates - warnings that fail build with -W flag +# -------------------------------------------------------------------------- +needs_warnings = {{ + "missing_topic_tag": "type in ['mem','dec','fact','pref','risk','goal','q'] and not any(t.startswith('topic:') for t in tags)", + "empty_body": "description == '' or description == 'TODO: Add description.'", + "deprecated_without_supersede": "status == 'deprecated' and len(supersedes_back) == 0", + "tag_case_mismatch": "any(t != t.lower() for t in tags)", + "missing_repo_tag": "type in ['mem','dec','fact'] and not any(t.startswith('repo:') for t in tags)", + "isolated_decision": "type == 'dec' and len(relates) == 0 and len(supersedes) == 0 and len(supersedes_back) == 0", + "draft_too_short": "status == 'draft' and (description == '' or len(description) < 20)", + "suspicious_high_confidence": "confidence == 'high' and type in ['mem', 'risk', 'goal', 'q']", + "isolated_memory": "status != 'draft' and (len(relates) + len(supersedes) + len(supersedes_back) + len(depends) + len(depends_back) + len(supports) + len(supports_back) + len(contradicts) + len(contradicts_back) + len(example_of) + len(example_of_back)) == 0", +}} """ _INDEX_RST = """\ diff --git a/tests/test_capture.py b/tests/test_capture.py deleted file mode 100644 index 03750d1..0000000 --- a/tests/test_capture.py +++ /dev/null @@ -1,744 +0,0 @@ -"""Tests for the capture module — git commit analysis and candidate generation.""" - -from __future__ import annotations - -from pathlib import Path -from unittest.mock import patch - -import pytest - -from ai_memory_protocol.capture import ( - MemoryCandidate, - _classify_commit, - _classify_statement, - _extract_scope, - _file_overlap, - _GitCommit, - _group_commits, - _infer_tags, - _is_duplicate, - _parse_ci_log, - capture_from_ci, - capture_from_discussion, - capture_from_git, - format_candidates, -) - -# --------------------------------------------------------------------------- -# Fixtures -# --------------------------------------------------------------------------- - - -@pytest.fixture -def fix_commit() -> _GitCommit: - return _GitCommit( - hash="abc12345", - subject="fix(gateway): resolve timeout issue", - body="The timeout was set too low.", - author="dev", - date="2026-01-15T10:00:00+00:00", - files=["src/gateway/server.cpp", "src/gateway/config.hpp"], - ) - - -@pytest.fixture -def feat_commit() -> _GitCommit: - return _GitCommit( - hash="def67890", - subject="feat(api): add health endpoint", - body="", - author="dev", - date="2026-01-16T10:00:00+00:00", - files=["src/api/health.cpp"], - ) - - -@pytest.fixture -def breaking_commit() -> _GitCommit: - return _GitCommit( - hash="ghi11111", - subject="refactor(core): restructure module layout", - body="BREAKING CHANGE: Module paths changed.", - author="dev", - date="2026-01-17T10:00:00+00:00", - files=["src/core/module.cpp"], - ) - - -@pytest.fixture -def plain_commit() -> _GitCommit: - return _GitCommit( - hash="jkl22222", - subject="Update README", - body="", - author="dev", - date="2026-01-18T10:00:00+00:00", - files=["README.md"], - ) - - -# --------------------------------------------------------------------------- -# Tests: _classify_commit -# --------------------------------------------------------------------------- - - -class TestClassifyCommit: - def test_fix_commit(self, fix_commit): - mem_type, confidence = _classify_commit(fix_commit) - assert mem_type == "mem" - assert confidence == "high" - - def test_feat_commit(self, feat_commit): - mem_type, confidence = _classify_commit(feat_commit) - assert mem_type == "fact" - assert confidence == "medium" - - def test_breaking_change(self, breaking_commit): - mem_type, confidence = _classify_commit(breaking_commit) - assert mem_type == "risk" - assert confidence == "high" - - def test_plain_commit(self, plain_commit): - mem_type, confidence = _classify_commit(plain_commit) - assert mem_type == "mem" - assert confidence == "low" - - def test_style_commit(self): - c = _GitCommit(hash="x", subject="style(ui): fix formatting", body="", author="", date="") - mem_type, _ = _classify_commit(c) - assert mem_type == "pref" - - def test_docs_commit(self): - c = _GitCommit(hash="x", subject="docs: update API guide", body="", author="", date="") - mem_type, _ = _classify_commit(c) - assert mem_type == "fact" - - -# --------------------------------------------------------------------------- -# Tests: _extract_scope -# --------------------------------------------------------------------------- - - -class TestExtractScope: - def test_with_scope(self): - assert _extract_scope("fix(gateway): bug") == "gateway" - - def test_without_scope(self): - assert _extract_scope("fix: general bug") == "" - - def test_with_parentheses_in_title(self): - assert _extract_scope("feat(api): add handler (new)") == "api" - - -# --------------------------------------------------------------------------- -# Tests: _infer_tags -# --------------------------------------------------------------------------- - - -class TestInferTags: - def test_includes_repo(self, fix_commit): - tags = _infer_tags(fix_commit, "ros2_medkit") - assert "repo:ros2_medkit" in tags - - def test_includes_scope_as_topic(self, fix_commit): - tags = _infer_tags(fix_commit, "ros2_medkit") - assert "topic:gateway" in tags - - def test_infers_from_paths(self, feat_commit): - tags = _infer_tags(feat_commit, "ros2_medkit") - assert "repo:ros2_medkit" in tags - # Should infer topic from file path - assert any("topic:" in t for t in tags) - - -# --------------------------------------------------------------------------- -# Tests: _file_overlap -# --------------------------------------------------------------------------- - - -class TestFileOverlap: - def test_full_overlap(self): - assert _file_overlap(["a.cpp", "b.cpp"], ["a.cpp", "b.cpp"]) == 1.0 - - def test_no_overlap(self): - assert _file_overlap(["a.cpp"], ["b.cpp"]) == 0.0 - - def test_partial_overlap(self): - overlap = _file_overlap(["a.cpp", "b.cpp"], ["b.cpp", "c.cpp"]) - assert 0.3 < overlap < 0.4 # 1/3 - - def test_empty_files(self): - assert _file_overlap([], []) == 0.0 - - -# --------------------------------------------------------------------------- -# Tests: _group_commits -# --------------------------------------------------------------------------- - - -class TestGroupCommits: - def test_groups_by_file_overlap(self, fix_commit): - c2 = _GitCommit( - hash="222", - subject="fix(gateway): another fix", - body="", - author="", - date="", - files=["src/gateway/server.cpp"], - ) - groups = _group_commits([fix_commit, c2]) - assert len(groups) == 1 # Should be grouped together - - def test_separate_groups_for_unrelated(self, fix_commit, feat_commit): - groups = _group_commits([fix_commit, feat_commit]) - assert len(groups) == 2 # Different files → separate groups - - def test_empty_list(self): - assert _group_commits([]) == [] - - def test_single_commit(self, fix_commit): - groups = _group_commits([fix_commit]) - assert len(groups) == 1 - assert len(groups[0]) == 1 - - -# --------------------------------------------------------------------------- -# Tests: _is_duplicate -# --------------------------------------------------------------------------- - - -class TestIsDuplicate: - def test_similar_title_is_duplicate(self): - candidate = MemoryCandidate( - type="mem", - title="Gateway timeout is 30 seconds", - body="test", - source="commit:abc", - ) - existing = { - "MEM_x": { - "title": "Gateway timeout is 30 seconds by default", - "status": "active", - "source": "", - }, - } - assert _is_duplicate(candidate, existing) - - def test_different_title_not_duplicate(self): - candidate = MemoryCandidate( - type="mem", - title="API supports pagination", - body="test", - source="commit:abc", - ) - existing = { - "MEM_x": { - "title": "Gateway timeout issue", - "status": "active", - "source": "", - }, - } - assert not _is_duplicate(candidate, existing) - - def test_same_source_is_duplicate(self): - candidate = MemoryCandidate( - type="mem", - title="Completely different title", - body="test", - source="commit:abc12345", - ) - existing = { - "MEM_x": { - "title": "Something else", - "status": "active", - "source": "commit:abc12345", - }, - } - assert _is_duplicate(candidate, existing) - - def test_skips_deprecated(self): - candidate = MemoryCandidate( - type="mem", - title="Gateway timeout", - body="", - source="", - ) - existing = { - "MEM_x": { - "title": "Gateway timeout", - "status": "deprecated", - "source": "", - }, - } - assert not _is_duplicate(candidate, existing) - - -# --------------------------------------------------------------------------- -# Tests: format_candidates -# --------------------------------------------------------------------------- - - -class TestFormatCandidates: - def test_empty_candidates(self): - result = format_candidates([]) - assert "No new" in result - - def test_human_format(self): - candidates = [ - MemoryCandidate( - type="mem", - title="Test fix", - body="Fixed a bug", - tags=["topic:test"], - source="commit:abc", - confidence="high", - ), - ] - result = format_candidates(candidates, fmt="human") - assert "Test fix" in result - assert "topic:test" in result - assert "commit:abc" in result - - def test_json_format(self): - candidates = [ - MemoryCandidate(type="fact", title="New feature", body="Added X", tags=["topic:api"]), - ] - result = format_candidates(candidates, fmt="json") - import json - - parsed = json.loads(result) - assert isinstance(parsed, list) - assert len(parsed) == 1 - assert parsed[0]["type"] == "fact" - - def test_multiple_candidates(self): - candidates = [ - MemoryCandidate(type="mem", title=f"Item {i}", body="", tags=[]) for i in range(5) - ] - result = format_candidates(candidates, fmt="human") - assert "5 memory candidate" in result - - -# --------------------------------------------------------------------------- -# Tests: capture_from_git (integration-ish, mocked subprocess) -# --------------------------------------------------------------------------- - - -class TestCaptureFromGit: - def test_no_commits_returns_empty(self, tmp_workspace): - with patch("ai_memory_protocol.capture._parse_git_log", return_value=[]): - candidates = capture_from_git( - workspace=tmp_workspace, - repo_path=Path("/fake/repo"), - since="HEAD~5", - until="HEAD", - ) - assert candidates == [] - - def test_single_commit_creates_candidate(self, tmp_workspace): - mock_commits = [ - _GitCommit( - hash="abc12345", - subject="fix(gateway): timeout bug", - body="Set default to 30s", - author="dev", - date="2026-01-15", - files=["src/server.cpp"], - ), - ] - with ( - patch("ai_memory_protocol.capture._parse_git_log", return_value=mock_commits), - patch("ai_memory_protocol.capture.load_needs", return_value={}), - ): - candidates = capture_from_git( - workspace=tmp_workspace, - repo_path=Path("/fake/repo"), - repo_name="ros2_medkit", - ) - assert len(candidates) == 1 - assert candidates[0].type == "mem" - assert "timeout" in candidates[0].title.lower() - assert "repo:ros2_medkit" in candidates[0].tags - - def test_dedup_filters_existing(self, tmp_workspace): - mock_commits = [ - _GitCommit( - hash="abc12345", - subject="fix: gateway timeout issue", - body="", - author="dev", - date="2026-01-15", - files=[], - ), - ] - existing_needs = { - "MEM_x": { - "title": "gateway timeout issue", - "status": "active", - "source": "", - }, - } - with ( - patch("ai_memory_protocol.capture._parse_git_log", return_value=mock_commits), - patch("ai_memory_protocol.capture.load_needs", return_value=existing_needs), - ): - candidates = capture_from_git( - workspace=tmp_workspace, - repo_path=Path("/fake/repo"), - deduplicate=True, - ) - assert len(candidates) == 0 - - def test_min_confidence_filters(self, tmp_workspace): - mock_commits = [ - _GitCommit( - hash="abc12345", - subject="chore: update deps", - body="", - author="dev", - date="2026-01-15", - files=[], - ), - ] - with ( - patch("ai_memory_protocol.capture._parse_git_log", return_value=mock_commits), - patch("ai_memory_protocol.capture.load_needs", return_value={}), - ): - candidates = capture_from_git( - workspace=tmp_workspace, - repo_path=Path("/fake/repo"), - min_confidence="medium", - ) - assert len(candidates) == 0 # chore → low confidence, filtered out - - -# --------------------------------------------------------------------------- -# Tests: MemoryCandidate -# --------------------------------------------------------------------------- - - -class TestMemoryCandidate: - def test_to_dict(self): - c = MemoryCandidate( - type="mem", - title="Test", - body="Body text", - tags=["topic:test"], - source="commit:abc", - confidence="high", - ) - d = c.to_dict() - assert d["type"] == "mem" - assert d["title"] == "Test" - assert d["confidence"] == "high" - assert "_source_hashes" not in d # Private field excluded - - def test_empty_fields_omitted(self): - c = MemoryCandidate(type="mem", title="Minimal", body="", tags=[]) - d = c.to_dict() - assert "body" not in d - assert "source" not in d - - -# =========================================================================== -# Tests: CI Log Capture -# =========================================================================== - - -class TestParseCILog: - def test_test_failure(self): - log = "FAILED: test_gateway_health\nSome other output" - matches = _parse_ci_log(log) - assert len(matches) >= 1 - assert matches[0].mem_type == "mem" - assert "gateway_health" in matches[0].title - - def test_pytest_failure(self): - log = "FAILED tests/test_api.py::TestHealth::test_endpoint" - matches = _parse_ci_log(log) - assert len(matches) >= 1 - assert "test_api" in matches[0].title or "TestHealth" in matches[0].title - - def test_compiler_error(self): - log = "src/server.cpp:42:10: error: use of undeclared identifier 'foo'" - matches = _parse_ci_log(log) - assert len(matches) >= 1 - assert matches[0].confidence == "high" - assert "server.cpp" in matches[0].title - - def test_deprecation_warning(self): - log = "DeprecationWarning: pkg_resources is deprecated" - matches = _parse_ci_log(log) - assert len(matches) >= 1 - assert matches[0].mem_type == "risk" - - def test_timeout_error(self): - log = "TimeoutError: connection timed out after 30 seconds" - matches = _parse_ci_log(log) - assert len(matches) >= 1 - assert "timeout" in matches[0].title.lower() - - def test_cmake_error(self): - log = "CMake Error at CMakeLists.txt:15: Could not find dependency XYZ" - matches = _parse_ci_log(log) - assert len(matches) >= 1 - assert "cmake" in matches[0].title.lower() or "CMake" in matches[0].detail - - def test_generic_error(self): - log = "Error: file not found: config.yaml" - matches = _parse_ci_log(log) - assert len(matches) >= 1 - - def test_empty_log(self): - matches = _parse_ci_log("") - assert matches == [] - - def test_clean_log_no_matches(self): - log = "Building project...\nCompilation successful.\nAll tests passed." - matches = _parse_ci_log(log) - assert matches == [] - - def test_dedup_within_log(self): - log = "FAILED: test_foo\nFAILED: test_foo\nFAILED: test_bar" - matches = _parse_ci_log(log) - titles = [m.title for m in matches] - # Should not have duplicate titles - assert len(titles) == len(set(titles)) - - -class TestCaptureFromCI: - def test_basic_capture(self, tmp_workspace): - log = "FAILED: test_health_check\nError: connection refused" - with patch("ai_memory_protocol.capture.load_needs", return_value={}): - candidates = capture_from_ci( - workspace=tmp_workspace, - log_text=log, - source="ci:test-run-123", - ) - assert len(candidates) >= 1 - assert all("topic:ci" in c.tags for c in candidates) - assert candidates[0].source == "ci:test-run-123" - - def test_extra_tags(self, tmp_workspace): - log = "FAILED: test_api" - with patch("ai_memory_protocol.capture.load_needs", return_value={}): - candidates = capture_from_ci( - workspace=tmp_workspace, - log_text=log, - tags=["repo:backend", "topic:api"], - ) - assert len(candidates) >= 1 - assert "repo:backend" in candidates[0].tags - assert "topic:ci" in candidates[0].tags - - def test_empty_log_returns_empty(self, tmp_workspace): - candidates = capture_from_ci( - workspace=tmp_workspace, - log_text="All tests passed. Build successful.", - ) - assert candidates == [] - - def test_dedup_against_existing(self, tmp_workspace): - log = "FAILED: test_health_check" - existing = { - "MEM_x": { - "title": "CI test failure: test_health_check", - "status": "active", - "source": "", - }, - } - with patch("ai_memory_protocol.capture.load_needs", return_value=existing): - candidates = capture_from_ci( - workspace=tmp_workspace, - log_text=log, - deduplicate=True, - ) - assert len(candidates) == 0 - - -# =========================================================================== -# Tests: Discussion Capture -# =========================================================================== - - -class TestClassifyStatement: - def test_decision(self): - result = _classify_statement("We decided to use FastAPI for the backend") - assert result is not None - mem_type, title, confidence = result - assert mem_type == "dec" - assert "FastAPI" in title - - def test_lets_go_with(self): - result = _classify_statement("Let's go with PostgreSQL for storage") - assert result is not None - assert result[0] == "dec" - - def test_preference(self): - result = _classify_statement("I prefer TypeScript over JavaScript") - assert result is not None - assert result[0] == "pref" - - def test_convention(self): - result = _classify_statement("Convention: all API responses use camelCase") - assert result is not None - assert result[0] == "pref" - - def test_goal(self): - result = _classify_statement("The goal is to have 80% test coverage") - assert result is not None - assert result[0] == "goal" - - def test_we_need_to(self): - result = _classify_statement("We need to optimize the database queries") - assert result is not None - assert result[0] == "goal" - - def test_todo(self): - result = _classify_statement("TODO: add retry logic for failed requests") - assert result is not None - assert result[0] == "goal" - - def test_fact_turns_out(self): - result = _classify_statement("It turns out the API uses OAuth2 internally") - assert result is not None - assert result[0] == "fact" - - def test_fact_til(self): - result = _classify_statement("TIL: Sphinx-Needs supports needextend directives") - assert result is not None - assert result[0] == "fact" - - def test_risk(self): - result = _classify_statement("Warning: this might break backward compatibility") - assert result is not None - assert result[0] == "risk" - - def test_could_break(self): - result = _classify_statement("This could break the CI pipeline if merged") - assert result is not None - assert result[0] == "risk" - - def test_question(self): - result = _classify_statement("Should we use Redis for caching?") - assert result is not None - assert result[0] == "q" - - def test_open_question(self): - result = _classify_statement("Open question: how do we handle rate limiting?") - assert result is not None - assert result[0] == "q" - - def test_no_match(self): - result = _classify_statement("The weather is nice today") - assert result is None - - def test_too_short(self): - result = _classify_statement("Decided ok") - assert result is None # Title too short after extraction - - -class TestCaptureFromDiscussion: - def test_basic_capture(self, tmp_workspace): - transcript = """ - We decided to use ROS 2 Jazzy for the gateway. - I prefer async/await over callbacks for all new code. - The goal is to have all endpoints documented by March. - Should we support gRPC in addition to REST? - """ - with patch("ai_memory_protocol.capture.load_needs", return_value={}): - candidates = capture_from_discussion( - workspace=tmp_workspace, - transcript=transcript, - source="meeting:standup", - ) - assert len(candidates) >= 3 - types = {c.type for c in candidates} - assert "dec" in types - assert "pref" in types or "goal" in types - - def test_tags_applied(self, tmp_workspace): - transcript = "We decided to deploy on Kubernetes for production" - with patch("ai_memory_protocol.capture.load_needs", return_value={}): - candidates = capture_from_discussion( - workspace=tmp_workspace, - transcript=transcript, - tags=["repo:infra"], - ) - assert len(candidates) >= 1 - assert "topic:discussion" in candidates[0].tags - assert "repo:infra" in candidates[0].tags - - def test_source_label(self, tmp_workspace): - transcript = "The goal is to launch by Q3 2026" - with patch("ai_memory_protocol.capture.load_needs", return_value={}): - candidates = capture_from_discussion( - workspace=tmp_workspace, - transcript=transcript, - source="slack:2026-02-10", - ) - assert len(candidates) >= 1 - assert candidates[0].source == "slack:2026-02-10" - - def test_empty_transcript(self, tmp_workspace): - candidates = capture_from_discussion( - workspace=tmp_workspace, - transcript="", - ) - assert candidates == [] - - def test_irrelevant_transcript(self, tmp_workspace): - transcript = """ - Good morning everyone. - How was your weekend? - Fine thanks. - """ - candidates = capture_from_discussion( - workspace=tmp_workspace, - transcript=transcript, - ) - assert candidates == [] - - def test_dedup_within_transcript(self, tmp_workspace): - transcript = """ - We decided to use PostgreSQL for storage. - As I said, we decided to use PostgreSQL for storage. - """ - with patch("ai_memory_protocol.capture.load_needs", return_value={}): - candidates = capture_from_discussion( - workspace=tmp_workspace, - transcript=transcript, - ) - # Should deduplicate within the same transcript - titles = [c.title.lower() for c in candidates] - assert len(titles) == len(set(titles)) - - def test_strips_prefixes(self, tmp_workspace): - transcript = """ - > We decided to adopt trunk-based development - - TODO: set up branch protection rules - 12:30 I prefer small PRs over large ones - """ - with patch("ai_memory_protocol.capture.load_needs", return_value={}): - candidates = capture_from_discussion( - workspace=tmp_workspace, - transcript=transcript, - ) - assert len(candidates) >= 2 - - def test_dedup_against_existing(self, tmp_workspace): - transcript = "We decided to use FastAPI for the backend" - existing = { - "DEC_x": { - "title": "use FastAPI for the backend", - "status": "active", - "source": "", - }, - } - with patch("ai_memory_protocol.capture.load_needs", return_value=existing): - candidates = capture_from_discussion( - workspace=tmp_workspace, - transcript=transcript, - deduplicate=True, - ) - assert len(candidates) == 0 diff --git a/tests/test_cli.py b/tests/test_cli.py index 593d6f7..f36ef04 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -60,23 +60,6 @@ def test_init_add_recall(self, tmp_path) -> None: assert result.returncode == 0 assert "FACT_" in result.stdout - def test_doctor(self, tmp_path) -> None: - ws = str(tmp_path / ".memories") - subprocess.run( - ["memory", "init", ws, "--name", "Doctor Test", "--install"], - capture_output=True, - text=True, - timeout=120, - ) - result = subprocess.run( - ["memory", "--dir", ws, "doctor"], - capture_output=True, - text=True, - timeout=30, - ) - # doctor should run without crash - assert result.returncode in (0, 1) - def test_tags_command(self, tmp_path) -> None: ws = str(tmp_path / ".memories") subprocess.run( diff --git a/tests/test_config.py b/tests/test_config.py index 03bd19c..184c040 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -6,6 +6,7 @@ DEFAULT_STATUS, LINK_FIELDS, METADATA_FIELDS, + TYPE_DEFAULT_CONFIDENCE, TYPE_FILES, TYPE_LABELS, TYPE_PREFIXES, @@ -76,3 +77,9 @@ def test_type_files_in_memory_dir(): """All type file paths should be under memory/.""" for typ, path in TYPE_FILES.items(): assert path.startswith("memory/"), f"Type '{typ}' path should start with 'memory/': {path}" + + +def test_type_default_confidence_covers_all_types(): + """Every type should have a default confidence level.""" + for mem_type in TYPE_FILES: + assert mem_type in TYPE_DEFAULT_CONFIDENCE, f"Missing confidence default for {mem_type}" diff --git a/tests/test_executor.py b/tests/test_executor.py deleted file mode 100644 index 029f582..0000000 --- a/tests/test_executor.py +++ /dev/null @@ -1,283 +0,0 @@ -"""Tests for the executor module — action execution and rollback.""" - -from __future__ import annotations - -from pathlib import Path - -import pytest - -from ai_memory_protocol.executor import ( - ExecutionResult, - actions_from_json, - execute_plan, - validate_actions, -) -from ai_memory_protocol.planner import Action -from ai_memory_protocol.rst import ( - append_to_rst, - generate_rst_directive, -) - -# --------------------------------------------------------------------------- -# Fixtures -# --------------------------------------------------------------------------- - - -@pytest.fixture -def workspace_with_memories(tmp_workspace: Path) -> Path: - """Workspace with a few memories already added.""" - for _, (mid, title, tags) in enumerate( - [ - ("MEM_alpha", "Alpha observation", ["topic:test", "repo:demo"]), - ("MEM_beta", "Beta observation", ["topic:test", "repo:demo"]), - ("DEC_choice_a", "Choose option A", ["topic:api"]), - ] - ): - mem_type = mid.split("_")[0].lower() - directive = generate_rst_directive( - mem_type=mem_type, - title=title, - need_id=mid, - tags=tags, - confidence="medium", - ) - append_to_rst(tmp_workspace, mem_type, directive) - return tmp_workspace - - -# --------------------------------------------------------------------------- -# Tests: validate_actions -# --------------------------------------------------------------------------- - - -class TestValidateActions: - def test_valid_retag(self): - actions = [Action(kind="RETAG", reason="fix tags", id="MEM_test")] - valid, skipped = validate_actions(actions) - assert len(valid) == 1 - assert len(skipped) == 0 - - def test_retag_missing_id(self): - actions = [Action(kind="RETAG", reason="fix tags")] - valid, skipped = validate_actions(actions) - assert len(valid) == 0 - assert len(skipped) == 1 - - def test_supersede_missing_old_id(self): - actions = [Action(kind="SUPERSEDE", reason="dup")] - valid, skipped = validate_actions(actions) - assert len(valid) == 0 - assert len(skipped) == 1 - - def test_circular_supersede_detected(self): - actions = [ - Action(kind="SUPERSEDE", reason="dup", old_id="A", by_id="B"), - Action(kind="SUPERSEDE", reason="dup", old_id="B", by_id="A"), - ] - valid, skipped = validate_actions(actions) - # At least one should be skipped for circular reference - assert len(skipped) >= 1 - - def test_update_missing_id(self): - actions = [Action(kind="UPDATE", reason="stale", field_changes={"status": "review"})] - valid, skipped = validate_actions(actions) - assert len(valid) == 0 - assert len(skipped) == 1 - - def test_split_file_missing_path(self): - actions = [Action(kind="SPLIT_FILE", reason="too big")] - valid, skipped = validate_actions(actions) - assert len(valid) == 0 - assert len(skipped) == 1 - - def test_mixed_valid_and_invalid(self): - actions = [ - Action(kind="RETAG", reason="good", id="MEM_x"), - Action(kind="RETAG", reason="bad"), # missing id - Action(kind="UPDATE", reason="ok", id="MEM_y", field_changes={"status": "review"}), - ] - valid, skipped = validate_actions(actions) - assert len(valid) == 2 - assert len(skipped) == 1 - - -# --------------------------------------------------------------------------- -# Tests: execute_plan -# --------------------------------------------------------------------------- - - -class TestExecutePlan: - def test_empty_actions(self, workspace_with_memories): - result = execute_plan(workspace_with_memories, []) - assert result.success - - def test_retag_action(self, workspace_with_memories): - actions = [ - Action( - kind="RETAG", - reason="add missing tag", - id="MEM_alpha", - add_tags=["tier:core"], - ) - ] - result = execute_plan(workspace_with_memories, actions, rebuild=False) - assert result.success - assert len(result.applied) == 1 - - def test_update_action(self, workspace_with_memories): - actions = [ - Action( - kind="UPDATE", - reason="mark for review", - id="MEM_alpha", - field_changes={"status": "review"}, - ) - ] - result = execute_plan(workspace_with_memories, actions, rebuild=False) - assert result.success - assert len(result.applied) == 1 - - def test_deprecate_action(self, workspace_with_memories): - actions = [Action(kind="DEPRECATE", reason="outdated", id="MEM_beta")] - result = execute_plan(workspace_with_memories, actions, rebuild=False) - assert result.success - assert len(result.applied) == 1 - - def test_supersede_action(self, workspace_with_memories): - actions = [ - Action( - kind="SUPERSEDE", - reason="duplicate", - old_id="MEM_beta", - by_id="MEM_alpha", - ) - ] - result = execute_plan(workspace_with_memories, actions, rebuild=False) - assert result.success - assert len(result.applied) == 1 - - def test_invalid_action_skipped(self, workspace_with_memories): - actions = [ - Action(kind="RETAG", reason="no id"), # Missing id - Action(kind="RETAG", reason="valid", id="MEM_alpha", add_tags=["topic:new"]), - ] - result = execute_plan(workspace_with_memories, actions, rebuild=False) - assert result.success - assert len(result.applied) == 1 - assert len(result.skipped) == 1 - - def test_split_file_informational(self, workspace_with_memories): - actions = [ - Action(kind="SPLIT_FILE", reason="too large", rst_path="/some/path.rst"), - ] - result = execute_plan(workspace_with_memories, actions, rebuild=False) - assert result.success - assert len(result.applied) == 1 - - def test_prune_action(self, workspace_with_memories): - actions = [Action(kind="PRUNE", reason="irrelevant", id="MEM_alpha")] - result = execute_plan(workspace_with_memories, actions, rebuild=False) - assert result.success - assert len(result.applied) == 1 - - def test_unknown_action_kind(self, workspace_with_memories): - actions = [Action(kind="RETAG", reason="valid", id="MEM_alpha")] - # Patch the kind after creation to test unknown handling - actions[0].kind = "NONEXISTENT" # type: ignore[assignment] - valid, _ = validate_actions(actions) - result = execute_plan(workspace_with_memories, valid, rebuild=False) - assert len(result.failed) == 1 - - def test_multiple_actions_sequential(self, workspace_with_memories): - actions = [ - Action(kind="RETAG", reason="add tag", id="MEM_alpha", add_tags=["tier:core"]), - Action( - kind="UPDATE", - reason="update status", - id="MEM_beta", - field_changes={"confidence": "high"}, - ), - ] - result = execute_plan(workspace_with_memories, actions, rebuild=False) - assert result.success - assert len(result.applied) == 2 - - -# --------------------------------------------------------------------------- -# Tests: ExecutionResult -# --------------------------------------------------------------------------- - - -class TestExecutionResult: - def test_to_dict(self): - result = ExecutionResult( - success=True, - applied=[{"action": {"kind": "RETAG"}, "message": "done"}], - message="OK", - ) - d = result.to_dict() - assert d["success"] is True - assert d["applied_count"] == 1 - assert d["failed_count"] == 0 - - def test_summary(self): - result = ExecutionResult( - success=True, - applied=[{}], - failed=[{}], - skipped=[{}], - message="test", - ) - s = result.summary() - assert "Applied: 1" in s - assert "Failed: 1" in s - assert "Skipped: 1" in s - - -# --------------------------------------------------------------------------- -# Tests: actions_from_json -# --------------------------------------------------------------------------- - - -class TestActionsFromJson: - def test_basic_deserialization(self): - data = [ - {"kind": "RETAG", "reason": "fix tags", "id": "MEM_x", "add_tags": ["topic:new"]}, - { - "kind": "UPDATE", - "reason": "stale", - "id": "MEM_y", - "field_changes": {"status": "review"}, - }, - ] - actions = actions_from_json(data) - assert len(actions) == 2 - assert actions[0].kind == "RETAG" - assert actions[0].add_tags == ["topic:new"] - assert actions[1].field_changes == {"status": "review"} - - def test_empty_list(self): - assert actions_from_json([]) == [] - - def test_defaults_for_missing_fields(self): - data = [{"kind": "DEPRECATE", "reason": "old", "id": "MEM_z"}] - actions = actions_from_json(data) - assert actions[0].add_tags == [] - assert actions[0].field_changes == {} - assert actions[0].by_id == "" - - def test_roundtrip(self): - """Action → to_dict → actions_from_json → same action.""" - original = Action( - kind="SUPERSEDE", - reason="duplicate", - old_id="MEM_old", - by_id="MEM_new", - new_tags=["topic:x"], - ) - d = original.to_dict() - restored = actions_from_json([d])[0] - assert restored.kind == original.kind - assert restored.old_id == original.old_id - assert restored.by_id == original.by_id - assert restored.new_tags == original.new_tags diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 851ce5e..ca7427d 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -33,7 +33,7 @@ def test_create_server(self) -> None: class TestMCPToolDefinitions: def test_tool_count(self) -> None: - assert len(TOOLS) >= 13 + assert len(TOOLS) == 8 def test_all_tools_have_schemas(self) -> None: for tool in TOOLS: @@ -51,13 +51,17 @@ def test_required_tools_present(self) -> None: "memory_tags", "memory_stale", "memory_rebuild", - "memory_plan", - "memory_apply", + ]: + assert expected in names, f"Missing tool: {expected}" + # Verify removed tools are gone + for removed in [ "memory_capture_git", "memory_capture_ci", "memory_capture_discussion", + "memory_plan", + "memory_apply", ]: - assert expected in names, f"Missing tool: {expected}" + assert removed not in names, f"Tool should be removed: {removed}" def test_tools_have_descriptions(self) -> None: for tool in TOOLS: diff --git a/tests/test_planner.py b/tests/test_planner.py deleted file mode 100644 index 092aa6c..0000000 --- a/tests/test_planner.py +++ /dev/null @@ -1,555 +0,0 @@ -"""Tests for the planner module — detection algorithms and plan formatting.""" - -from __future__ import annotations - -import json -from datetime import date, timedelta - -import pytest - -from ai_memory_protocol.planner import ( - Action, - detect_auto_summaries, - detect_conflicts, - detect_duplicates, - detect_missing_tags, - detect_split_files, - detect_stale, - detect_tag_normalization, - format_plan, - run_plan, -) - -# --------------------------------------------------------------------------- -# Fixtures -# --------------------------------------------------------------------------- - - -@pytest.fixture -def needs_with_duplicates() -> dict: - """Two near-duplicate active needs.""" - return { - "MEM_gateway_timeout": { - "id": "MEM_gateway_timeout", - "type": "mem", - "title": "Gateway timeout is 30 seconds", - "description": "Default timeout.", - "status": "active", - "tags": ["topic:gateway", "repo:ros2_medkit"], - "confidence": "medium", - "created_at": "2026-01-10", - }, - "MEM_gateway_timeout_issue": { - "id": "MEM_gateway_timeout_issue", - "type": "mem", - "title": "Gateway timeout is 30 seconds by default", - "description": "Same info.", - "status": "active", - "tags": ["topic:gateway", "repo:ros2_medkit"], - "confidence": "high", - "created_at": "2026-01-15", - }, - } - - -@pytest.fixture -def needs_with_missing_tags() -> dict: - """Needs missing topic: or repo: tags.""" - return { - "MEM_no_topic": { - "id": "MEM_no_topic", - "type": "mem", - "title": "An observation", - "status": "active", - "tags": ["repo:ros2_medkit"], - "confidence": "medium", - }, - "MEM_no_repo": { - "id": "MEM_no_repo", - "type": "mem", - "title": "Another observation", - "status": "active", - "tags": ["topic:gateway"], - "confidence": "medium", - }, - "MEM_no_tags_at_all": { - "id": "MEM_no_tags_at_all", - "type": "mem", - "title": "Missing all tags", - "status": "active", - "tags": [], - "confidence": "low", - }, - "MEM_complete": { - "id": "MEM_complete", - "type": "mem", - "title": "Complete one", - "status": "active", - "tags": ["topic:api", "repo:ros2_medkit"], - "confidence": "high", - }, - } - - -@pytest.fixture -def needs_with_stale() -> dict: - """Needs with expired/review-overdue dates.""" - yesterday = (date.today() - timedelta(days=1)).isoformat() - tomorrow = (date.today() + timedelta(days=1)).isoformat() - return { - "MEM_expired": { - "id": "MEM_expired", - "type": "mem", - "title": "Old memory", - "status": "active", - "tags": ["topic:test"], - "expires_at": yesterday, - "review_after": "", - }, - "MEM_review_due": { - "id": "MEM_review_due", - "type": "mem", - "title": "Review needed", - "status": "active", - "tags": ["topic:test"], - "expires_at": "", - "review_after": yesterday, - }, - "MEM_still_fresh": { - "id": "MEM_still_fresh", - "type": "mem", - "title": "Still fresh", - "status": "active", - "tags": ["topic:test"], - "expires_at": "", - "review_after": tomorrow, - }, - } - - -@pytest.fixture -def needs_with_tag_issues() -> dict: - """Needs with case-inconsistent tags.""" - return { - "MEM_one": { - "id": "MEM_one", - "type": "mem", - "title": "First", - "status": "active", - "tags": ["topic:Gateway", "repo:ros2_medkit"], - }, - "MEM_two": { - "id": "MEM_two", - "type": "mem", - "title": "Second", - "status": "active", - "tags": ["topic:gateway", "repo:ros2_medkit"], - }, - "MEM_three": { - "id": "MEM_three", - "type": "mem", - "title": "Third", - "status": "active", - "tags": ["topic:gateway", "repo:ros2_medkit"], - }, - } - - -@pytest.fixture -def needs_with_conflicts() -> dict: - """Two decisions on the same topic with no link.""" - return { - "DEC_use_rest": { - "id": "DEC_use_rest", - "type": "dec", - "title": "Use REST for API", - "status": "active", - "tags": ["topic:api"], - }, - "DEC_use_grpc": { - "id": "DEC_use_grpc", - "type": "dec", - "title": "Use gRPC for API", - "status": "active", - "tags": ["topic:api"], - }, - } - - -# --------------------------------------------------------------------------- -# Tests: detect_duplicates -# --------------------------------------------------------------------------- - - -class TestDetectDuplicates: - def test_finds_near_duplicates(self, needs_with_duplicates): - actions = detect_duplicates(needs_with_duplicates) - assert len(actions) == 1 - assert actions[0].kind == "SUPERSEDE" - - def test_prefers_higher_confidence(self, needs_with_duplicates): - actions = detect_duplicates(needs_with_duplicates) - # The higher-confidence one should be kept - action = actions[0] - assert action.by_id == "MEM_gateway_timeout_issue" # high confidence - assert action.old_id == "MEM_gateway_timeout" # medium confidence - - def test_no_duplicates_for_different_titles(self, sample_needs): - actions = detect_duplicates(sample_needs) - assert len(actions) == 0 - - def test_skips_deprecated(self, sample_needs): - """Deprecated needs should not be flagged as duplicates.""" - actions = detect_duplicates(sample_needs) - assert all(a.old_id != "FACT_deprecated" for a in actions) - - def test_threshold_respected(self, needs_with_duplicates): - # With very high threshold, should find nothing - actions = detect_duplicates(needs_with_duplicates, title_threshold=0.99) - assert len(actions) == 0 - - def test_tag_overlap_threshold(self, needs_with_duplicates): - # With very high tag overlap requirement, should still match (100% overlap) - actions = detect_duplicates( - needs_with_duplicates, title_threshold=0.8, tag_overlap_threshold=0.9 - ) - assert len(actions) == 1 - - -# --------------------------------------------------------------------------- -# Tests: detect_missing_tags -# --------------------------------------------------------------------------- - - -class TestDetectMissingTags: - def test_finds_missing_topic(self, needs_with_missing_tags): - actions = detect_missing_tags(needs_with_missing_tags) - ids_with_actions = {a.id for a in actions} - assert "MEM_no_topic" in ids_with_actions - - def test_finds_missing_repo(self, needs_with_missing_tags): - actions = detect_missing_tags(needs_with_missing_tags) - ids_with_actions = {a.id for a in actions} - assert "MEM_no_repo" in ids_with_actions - - def test_finds_missing_both(self, needs_with_missing_tags): - actions = detect_missing_tags(needs_with_missing_tags) - ids_with_actions = {a.id for a in actions} - assert "MEM_no_tags_at_all" in ids_with_actions - - def test_skips_complete(self, needs_with_missing_tags): - actions = detect_missing_tags(needs_with_missing_tags) - ids_with_actions = {a.id for a in actions} - assert "MEM_complete" not in ids_with_actions - - def test_action_type_is_retag(self, needs_with_missing_tags): - actions = detect_missing_tags(needs_with_missing_tags) - assert all(a.kind == "RETAG" for a in actions) - - -# --------------------------------------------------------------------------- -# Tests: detect_stale -# --------------------------------------------------------------------------- - - -class TestDetectStale: - def test_finds_expired(self, needs_with_stale): - actions = detect_stale(needs_with_stale) - ids = {a.id for a in actions} - assert "MEM_expired" in ids - - def test_finds_review_overdue(self, needs_with_stale): - actions = detect_stale(needs_with_stale) - ids = {a.id for a in actions} - assert "MEM_review_due" in ids - - def test_skips_fresh(self, needs_with_stale): - actions = detect_stale(needs_with_stale) - ids = {a.id for a in actions} - assert "MEM_still_fresh" not in ids - - def test_action_type_is_update(self, needs_with_stale): - actions = detect_stale(needs_with_stale) - assert all(a.kind == "UPDATE" for a in actions) - for a in actions: - assert a.field_changes.get("status") == "review" - - -# --------------------------------------------------------------------------- -# Tests: detect_conflicts -# --------------------------------------------------------------------------- - - -class TestDetectConflicts: - def test_finds_unlinked_decisions(self, needs_with_conflicts): - actions = detect_conflicts(needs_with_conflicts) - assert len(actions) >= 1 - - def test_skips_linked_decisions(self): - needs = { - "DEC_a": { - "type": "dec", - "title": "A", - "status": "active", - "tags": ["topic:api"], - "relates": ["DEC_b"], - }, - "DEC_b": { - "type": "dec", - "title": "B", - "status": "active", - "tags": ["topic:api"], - }, - } - actions = detect_conflicts(needs) - assert len(actions) == 0 - - -# --------------------------------------------------------------------------- -# Tests: detect_tag_normalization -# --------------------------------------------------------------------------- - - -class TestDetectTagNormalization: - def test_finds_inconsistent_case(self, needs_with_tag_issues): - actions = detect_tag_normalization(needs_with_tag_issues) - assert len(actions) >= 1 - - def test_normalizes_to_most_common(self, needs_with_tag_issues): - actions = detect_tag_normalization(needs_with_tag_issues) - # "topic:gateway" appears twice, "topic:Gateway" once → normalize to lowercase - for a in actions: - if "Gateway" in str(a.remove_tags): - assert "topic:gateway" in a.add_tags - - def test_action_type_is_retag(self, needs_with_tag_issues): - actions = detect_tag_normalization(needs_with_tag_issues) - assert all(a.kind == "RETAG" for a in actions) - - -# --------------------------------------------------------------------------- -# Tests: detect_auto_summaries -# --------------------------------------------------------------------------- - - -class TestDetectAutoSummaries: - @pytest.fixture - def needs_many_old_observations(self) -> dict: - """Six old observations on the same topic.""" - old_date = (date.today() - timedelta(days=90)).isoformat() - return { - f"MEM_obs_{i}": { - "id": f"MEM_obs_{i}", - "type": "mem", - "title": f"Observation {i} about gateway", - "status": "active", - "tags": ["topic:gateway", "repo:ros2_medkit"], - "confidence": "medium", - "created_at": old_date, - } - for i in range(6) - } - - def test_finds_consolidation_candidates(self, needs_many_old_observations): - actions = detect_auto_summaries(needs_many_old_observations) - assert len(actions) == 1 - assert actions[0].kind == "SUPERSEDE" - assert actions[0].new_type == "fact" - assert "gateway" in actions[0].new_title.lower() - - def test_includes_all_ids(self, needs_many_old_observations): - actions = detect_auto_summaries(needs_many_old_observations) - old_ids = actions[0].old_id.split(",") - assert len(old_ids) == 6 - - def test_collects_tags_from_group(self, needs_many_old_observations): - actions = detect_auto_summaries(needs_many_old_observations) - assert "topic:gateway" in actions[0].new_tags - assert "repo:ros2_medkit" in actions[0].new_tags - - def test_skips_below_threshold(self): - """Fewer than min_count entries should not trigger.""" - old_date = (date.today() - timedelta(days=90)).isoformat() - needs = { - f"MEM_obs_{i}": { - "id": f"MEM_obs_{i}", - "type": "mem", - "title": f"Observation {i}", - "status": "active", - "tags": ["topic:gateway"], - "created_at": old_date, - } - for i in range(4) # Only 4, below default min_count=5 - } - actions = detect_auto_summaries(needs) - assert len(actions) == 0 - - def test_skips_recent_observations(self): - """Recent observations should not be consolidated.""" - recent_date = (date.today() - timedelta(days=10)).isoformat() - needs = { - f"MEM_obs_{i}": { - "id": f"MEM_obs_{i}", - "type": "mem", - "title": f"Observation {i}", - "status": "active", - "tags": ["topic:gateway"], - "created_at": recent_date, - } - for i in range(6) - } - actions = detect_auto_summaries(needs) - assert len(actions) == 0 - - def test_skips_non_mem_types(self): - """Only 'mem' type entries should be considered.""" - old_date = (date.today() - timedelta(days=90)).isoformat() - needs = { - f"FACT_obs_{i}": { - "id": f"FACT_obs_{i}", - "type": "fact", - "title": f"Fact {i}", - "status": "active", - "tags": ["topic:gateway"], - "created_at": old_date, - } - for i in range(6) - } - actions = detect_auto_summaries(needs) - assert len(actions) == 0 - - def test_skips_deprecated(self): - """Deprecated observations should be excluded.""" - old_date = (date.today() - timedelta(days=90)).isoformat() - needs = { - f"MEM_obs_{i}": { - "id": f"MEM_obs_{i}", - "type": "mem", - "title": f"Observation {i}", - "status": "deprecated", - "tags": ["topic:gateway"], - "created_at": old_date, - } - for i in range(6) - } - actions = detect_auto_summaries(needs) - assert len(actions) == 0 - - def test_custom_thresholds(self): - """Custom min_count and min_age_days.""" - old_date = (date.today() - timedelta(days=30)).isoformat() - needs = { - f"MEM_obs_{i}": { - "id": f"MEM_obs_{i}", - "type": "mem", - "title": f"Observation {i}", - "status": "active", - "tags": ["topic:gateway"], - "created_at": old_date, - } - for i in range(3) - } - # Default thresholds: should not trigger - assert len(detect_auto_summaries(needs)) == 0 - # Lower thresholds: should trigger - actions = detect_auto_summaries(needs, min_count=3, min_age_days=20) - assert len(actions) == 1 - - -# --------------------------------------------------------------------------- -# Tests: detect_split_files -# --------------------------------------------------------------------------- - - -class TestDetectSplitFiles: - def test_no_split_needed(self, tmp_workspace): - actions = detect_split_files(tmp_workspace) - assert len(actions) == 0 - - def test_detects_oversized(self, tmp_workspace): - # Write many directives to a file - rst_path = tmp_workspace / "memory" / "observations.rst" - content = rst_path.read_text() - for i in range(55): - content += f"\n.. mem:: Entry {i}\n :id: MEM_entry_{i}\n\n Body text.\n" - rst_path.write_text(content) - - actions = detect_split_files(tmp_workspace) - assert len(actions) >= 1 - assert actions[0].kind == "SPLIT_FILE" - - -# --------------------------------------------------------------------------- -# Tests: run_plan -# --------------------------------------------------------------------------- - - -class TestRunPlan: - def test_runs_all_checks(self, needs_json_file, sample_needs): - actions = run_plan(needs_json_file, needs=sample_needs) - # sample_needs has missing repo tags on DEC_use_httplib - assert isinstance(actions, list) - - def test_runs_specific_checks(self, needs_json_file, sample_needs): - actions = run_plan(needs_json_file, checks=["missing_tags"], needs=sample_needs) - # Only RETAG actions from missing_tags check - assert all(a.kind == "RETAG" for a in actions) - - def test_empty_needs_returns_empty(self, needs_json_file): - actions = run_plan(needs_json_file, needs={}) - assert actions == [] - - -# --------------------------------------------------------------------------- -# Tests: format_plan -# --------------------------------------------------------------------------- - - -class TestFormatPlan: - def test_empty_plan(self): - result = format_plan([]) - assert "healthy" in result.lower() - - def test_human_format(self): - actions = [ - Action(kind="RETAG", reason="Missing topic tag", id="MEM_test"), - ] - result = format_plan(actions, fmt="human") - assert "RETAG" in result - assert "MEM_test" in result - - def test_json_format(self): - actions = [ - Action(kind="UPDATE", reason="Stale", id="MEM_old", field_changes={"status": "review"}), - ] - result = format_plan(actions, fmt="json") - parsed = json.loads(result) - assert isinstance(parsed, list) - assert len(parsed) == 1 - assert parsed[0]["kind"] == "UPDATE" - - -# --------------------------------------------------------------------------- -# Tests: Action dataclass -# --------------------------------------------------------------------------- - - -class TestAction: - def test_to_dict_omits_empty(self): - a = Action(kind="RETAG", reason="test", id="MEM_x", add_tags=["topic:new"]) - d = a.to_dict() - assert "remove_tags" not in d - assert "field_changes" not in d - assert d["kind"] == "RETAG" - assert d["id"] == "MEM_x" - - def test_supersede_action(self): - a = Action( - kind="SUPERSEDE", - reason="duplicate", - old_id="MEM_old", - by_id="MEM_new", - ) - d = a.to_dict() - assert d["old_id"] == "MEM_old" - assert d["by_id"] == "MEM_new" diff --git a/tests/test_rst.py b/tests/test_rst.py index 4894ed1..9c4a22d 100644 --- a/tests/test_rst.py +++ b/tests/test_rst.py @@ -239,3 +239,146 @@ def test_deprecate_with_superseded_by(self, tmp_workspace: Path) -> None: def test_deprecate_nonexistent(self, tmp_workspace: Path) -> None: ok, msg = deprecate_in_rst(tmp_workspace, "MEM_nonexistent") assert not ok + + +class TestUpdateBodyInRst: + def test_replace_body(self, tmp_workspace: Path) -> None: + from ai_memory_protocol.rst import update_body_in_rst + + directive = generate_rst_directive( + "fact", "Test Fact", tags=["topic:test"], body="Original body text here." + ) + append_to_rst(tmp_workspace, "fact", directive) + + ok, msg = update_body_in_rst( + tmp_workspace, "FACT_test_fact", "Updated body with new content." + ) + assert ok, msg + content = (tmp_workspace / "memory" / "facts.rst").read_text() + assert "Updated body with new content." in content + assert "Original body text here." not in content + + def test_replace_multiline_body(self, tmp_workspace: Path) -> None: + from ai_memory_protocol.rst import update_body_in_rst + + directive = generate_rst_directive( + "dec", "Test Dec", tags=["topic:test"], body="Line one. Line two. Line three." + ) + append_to_rst(tmp_workspace, "dec", directive) + + ok, msg = update_body_in_rst(tmp_workspace, "DEC_test_dec", "New single line body.") + assert ok, msg + content = (tmp_workspace / "memory" / "decisions.rst").read_text() + assert "New single line body." in content + + def test_body_not_found(self, tmp_workspace: Path) -> None: + from ai_memory_protocol.rst import update_body_in_rst + + ok, msg = update_body_in_rst(tmp_workspace, "FACT_nonexistent", "New body.") + assert not ok + + def test_replace_body_preserves_adjacent_directive(self, tmp_workspace: Path) -> None: + from ai_memory_protocol.rst import update_body_in_rst + + d1 = generate_rst_directive( + "mem", "First", need_id="MEM_first", tags=["topic:test"], body="Body one." + ) + d2 = generate_rst_directive( + "mem", "Second", need_id="MEM_second", tags=["topic:test"], body="Body two." + ) + append_to_rst(tmp_workspace, "mem", d1) + append_to_rst(tmp_workspace, "mem", d2) + + ok, msg = update_body_in_rst(tmp_workspace, "MEM_first", "Updated body one.") + assert ok, msg + content = (tmp_workspace / "memory" / "observations.rst").read_text() + assert "Updated body one." in content + assert "Body two." in content + + def test_body_update_sets_updated_at(self, tmp_workspace: Path) -> None: + from datetime import date + + from ai_memory_protocol.rst import update_body_in_rst + + directive = generate_rst_directive( + "mem", "TS Test", need_id="MEM_ts_test", tags=["topic:test"], body="Old." + ) + append_to_rst(tmp_workspace, "mem", directive) + ok, _ = update_body_in_rst(tmp_workspace, "MEM_ts_test", "New.") + assert ok + content = (tmp_workspace / "memory" / "observations.rst").read_text() + assert f":updated_at: {date.today().isoformat()}" in content + + +class TestUpdateTitleInRst: + def test_replace_title(self, tmp_workspace: Path) -> None: + from ai_memory_protocol.rst import update_title_in_rst + + directive = generate_rst_directive( + "fact", "Old Title", tags=["topic:test"], body="Some body." + ) + append_to_rst(tmp_workspace, "fact", directive) + + ok, msg = update_title_in_rst(tmp_workspace, "FACT_old_title", "New Title") + assert ok, msg + content = (tmp_workspace / "memory" / "facts.rst").read_text() + assert ".. fact:: New Title" in content + assert ".. fact:: Old Title" not in content + + def test_title_not_found(self, tmp_workspace: Path) -> None: + from ai_memory_protocol.rst import update_title_in_rst + + ok, msg = update_title_in_rst(tmp_workspace, "FACT_nonexistent", "New") + assert not ok + + def test_replace_title_with_colon(self, tmp_workspace: Path) -> None: + from ai_memory_protocol.rst import update_title_in_rst + + directive = generate_rst_directive( + "fact", "API endpoint", need_id="FACT_api", tags=["topic:test"], body="Some body." + ) + append_to_rst(tmp_workspace, "fact", directive) + + ok, msg = update_title_in_rst(tmp_workspace, "FACT_api", "API: new endpoint design") + assert ok, msg + content = (tmp_workspace / "memory" / "facts.rst").read_text() + assert ".. fact:: API: new endpoint design" in content + + def test_replace_title_among_multiple_directives(self, tmp_workspace: Path) -> None: + from ai_memory_protocol.rst import update_title_in_rst + + d1 = generate_rst_directive( + "fact", "First fact", need_id="FACT_first", tags=["topic:test"], body="Body." + ) + d2 = generate_rst_directive( + "fact", "Second fact", need_id="FACT_second", tags=["topic:test"], body="Body." + ) + append_to_rst(tmp_workspace, "fact", d1) + append_to_rst(tmp_workspace, "fact", d2) + + ok, msg = update_title_in_rst(tmp_workspace, "FACT_second", "Renamed second") + assert ok, msg + content = (tmp_workspace / "memory" / "facts.rst").read_text() + assert ".. fact:: First fact" in content + assert ".. fact:: Renamed second" in content + + def test_title_sanitizes_newlines(self, tmp_workspace: Path) -> None: + from ai_memory_protocol.rst import update_title_in_rst + + directive = generate_rst_directive( + "fact", "Original", need_id="FACT_orig", tags=["topic:test"], body="Body." + ) + append_to_rst(tmp_workspace, "fact", directive) + + ok, msg = update_title_in_rst(tmp_workspace, "FACT_orig", "Line1\nLine2") + assert ok, msg + content = (tmp_workspace / "memory" / "facts.rst").read_text() + assert ".. fact:: Line1 Line2" in content + assert "\nLine2" not in content + + def test_empty_title_rejected(self, tmp_workspace: Path) -> None: + from ai_memory_protocol.rst import update_title_in_rst + + ok, msg = update_title_in_rst(tmp_workspace, "FACT_x", "") + assert not ok + assert "empty" in msg.lower() diff --git a/tests/test_scaffold.py b/tests/test_scaffold.py index 5f99331..99953fb 100644 --- a/tests/test_scaffold.py +++ b/tests/test_scaffold.py @@ -67,3 +67,18 @@ def test_custom_author(self, tmp_path: Path) -> None: init_workspace(ws, project_name="Test", author="testauthor") content = (ws / "conf.py").read_text() assert "testauthor" in content + + +class TestConfPyWarnings: + def test_scaffold_includes_needs_warnings(self, tmp_path: Path) -> None: + from ai_memory_protocol.scaffold import init_workspace + + ws = tmp_path / "test_ws" + init_workspace(ws, "Test", "Author") + conf_content = (ws / "conf.py").read_text() + assert "needs_warnings" in conf_content + assert "missing_topic_tag" in conf_content + assert "tag_case_mismatch" in conf_content + assert "isolated_decision" in conf_content + assert "suspicious_high_confidence" in conf_content + assert "isolated_memory" in conf_content