diff --git a/src/vouch/health.py b/src/vouch/health.py index ac3fe510..17b6292c 100644 --- a/src/vouch/health.py +++ b/src/vouch/health.py @@ -7,14 +7,17 @@ from __future__ import annotations +import shutil +import tempfile +import warnings from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta from pathlib import Path from . import index_db from .audit import count_events -from .models import ClaimStatus, ProposalStatus -from .storage import KBStore, sha256_hex +from .models import Claim, ClaimStatus, Entity, Page, ProposalStatus +from .storage import KBStore, _deserialize_page, _yaml_load, sha256_hex from .verify import verify_all @@ -140,14 +143,35 @@ def doctor(store: KBStore) -> HealthReport: "info", "index_missing", "state.db not present — run `vouch index` to build it", )) + else: + # Detect a blown index: DB exists but FTS is empty while artifact files + # are present on disk. This is the aftermath of a crashed rebuild_index. + stats = index_db.stats(store.kb_dir) + claims_on_disk = len(list((store.kb_dir / "claims").glob("*.yaml"))) + pages_on_disk = len(list((store.kb_dir / "pages").glob("*.md"))) + entities_on_disk = len(list((store.kb_dir / "entities").glob("*.yaml"))) + artifacts_on_disk = claims_on_disk + pages_on_disk + entities_on_disk + indexed = stats["claims"] + stats["pages"] + stats["entities"] + if artifacts_on_disk > 0 and indexed == 0: + report.findings.append(Finding( + "warning", "index_blown", + "state.db is empty but artifact files exist on disk — " + "run `vouch index` to rebuild the search index", + )) report.ok = not any(f.severity == "error" for f in report.findings) return report def rebuild_index(store: KBStore) -> dict: - """Drop and rebuild state.db from the durable files. Idempotent.""" - # Detect a stale embedding-model identity before reset() wipes the meta. + """Rebuild state.db from durable files, atomically. + + Writes into a temp file in the same directory (so the final rename is + atomic on any POSIX filesystem), then swaps it into place only on full + success. If anything raises — bad YAML, validation error, I/O — the + temp file is deleted and the existing index is left untouched. + """ + # Detect a stale embedding-model identity before we discard the old meta. try: from . import audit from .embeddings.migration import detect_mismatch @@ -160,50 +184,113 @@ def rebuild_index(store: KBStore) -> dict: ) except ImportError: pass - index_db.reset(store.kb_dir) - with index_db.open_db(store.kb_dir) as conn: - for c in store.list_claims(): - index_db.index_claim( - conn, id=c.id, text=c.text, - type=c.type.value, status=c.status.value, tags=c.tags, - ) - for p in store.list_pages(): - index_db.index_page( - conn, id=p.id, title=p.title, body=p.body, - type=p.type.value, tags=p.tags, - ) - for e in store.list_entities(): - index_db.index_entity( - conn, id=e.id, name=e.name, description=e.description, - type=e.type.value, aliases=e.aliases, - ) - _rebuild_embeddings(store) + + db_path = store.kb_dir / index_db.DB_FILENAME + store.kb_dir.mkdir(parents=True, exist_ok=True) + + with tempfile.NamedTemporaryFile( + dir=store.kb_dir, suffix=".db.tmp", delete=False, + ) as tmp: + tmp_path = Path(tmp.name) + + skipped: list[tuple[str, str]] = [] + claims: list[Claim] = [] + pages: list[Page] = [] + entities: list[Entity] = [] + try: + with index_db.open_db_at(tmp_path) as conn: + for claim_path in sorted((store.kb_dir / "claims").glob("*.yaml")): + try: + c: Claim = Claim.model_validate( + _yaml_load(claim_path.read_text()), + ) + except Exception as exc: + skipped.append((claim_path.name, str(exc))) + continue + claims.append(c) + index_db.index_claim( + conn, id=c.id, text=c.text, + type=c.type.value, status=c.status.value, tags=c.tags, + ) + for page_path in sorted((store.kb_dir / "pages").glob("*.md")): + try: + pg: Page = _deserialize_page(page_path.read_text()) + except Exception as exc: + skipped.append((page_path.name, str(exc))) + continue + pages.append(pg) + index_db.index_page( + conn, id=pg.id, title=pg.title, body=pg.body, + type=pg.type.value, tags=pg.tags, + ) + for entity_path in sorted((store.kb_dir / "entities").glob("*.yaml")): + try: + e: Entity = Entity.model_validate( + _yaml_load(entity_path.read_text()), + ) + except Exception as exc: + skipped.append((entity_path.name, str(exc))) + continue + entities.append(e) + index_db.index_entity( + conn, id=e.id, name=e.name, description=e.description, + type=e.type.value, aliases=e.aliases, + ) + # Embeddings are built into the temp DB before the swap so the + # entire operation remains atomic — a failure here aborts the + # rename and the live index is left untouched. + _write_embeddings(conn, claims=claims, pages=pages, entities=entities) + shutil.move(str(tmp_path), str(db_path)) + except Exception: + tmp_path.unlink(missing_ok=True) + raise + + if skipped: + warnings.warn( + f"rebuild_index skipped {len(skipped)} unreadable artifact(s): {skipped}", + stacklevel=2, + ) + return index_db.stats(store.kb_dir) -def _rebuild_embeddings(store: KBStore) -> None: +def _write_embeddings( # type: ignore[no-untyped-def] + conn, + *, + claims: list[Claim], + pages: list[Page], + entities: list[Entity], +) -> None: + """Write embedding vectors into *conn* (caller owns the connection/commit). + + No-ops when the embeddings extra is not installed or no embedder is + registered. Other embedder startup failures propagate so rebuild_index + does not silently swap in an index with an empty embedding_index. + """ try: from .embeddings import get_embedder + except ImportError: + return + try: embedder = get_embedder() - except Exception: + except (KeyError, ImportError): + return + texts: list[tuple[str, str, str]] = [] + for c in claims: + texts.append(("claim", c.id, c.text)) + for pg in pages: + texts.append(("page", pg.id, f"{pg.title} {pg.body}")) + for e in entities: + texts.append(("entity", e.id, f"{e.name} {e.description or ''}")) + if not texts: return - with index_db.open_db(store.kb_dir) as conn: - texts: list[tuple[str, str, str]] = [] - for c in store.list_claims(): - texts.append(("claim", c.id, c.text)) - for p in store.list_pages(): - texts.append(("page", p.id, f"{p.title} {p.body}")) - for e in store.list_entities(): - texts.append(("entity", e.id, f"{e.name} {e.description or ''}")) - if not texts: - return - batch_size = 64 - for i in range(0, len(texts), batch_size): - batch = texts[i:i + batch_size] - vecs = embedder.encode_batch([t[2] for t in batch]) - for (kind, eid, _), row in zip(batch, vecs, strict=True): - index_db.index_embedding(conn, kind=kind, id=eid, - vec=row.tolist()) + batch_size = 64 + for i in range(0, len(texts), batch_size): + batch = texts[i:i + batch_size] + vecs = embedder.encode_batch([t[2] for t in batch]) + for (kind, eid, _), row in zip(batch, vecs, strict=True): + index_db.index_embedding(conn, kind=kind, id=eid, + vec=row.tolist()) # --- helpers used by `vouch discover` (CLI) ------------------------------- diff --git a/src/vouch/index_db.py b/src/vouch/index_db.py index 7d026273..40a13987 100644 --- a/src/vouch/index_db.py +++ b/src/vouch/index_db.py @@ -95,6 +95,19 @@ def open_db(kb_dir: Path): conn.close() +@contextmanager +def open_db_at(path: Path): + """Like open_db but accepts an explicit path — used for atomic temp-file rebuilds.""" + path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(path)) + try: + conn.executescript(SCHEMA) + yield conn + conn.commit() + finally: + conn.close() + + def reset(kb_dir: Path) -> None: """Drop everything; the rebuild caller re-populates. diff --git a/tests/test_index.py b/tests/test_index.py index e962dea0..3c92bede 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -2,6 +2,7 @@ from __future__ import annotations +import warnings from pathlib import Path import pytest @@ -47,3 +48,48 @@ def test_index_via_approve(store: KBStore) -> None: approve(store, pr.id, approved_by="u") hits = index_db.search(store.kb_dir, "indexed") assert any(k == "claim" for k, *_ in hits) + + +def test_rebuild_index_preserves_good_claims_when_one_yaml_is_corrupt( + store: KBStore, +) -> None: + """A bad YAML must not destroy the index for healthy claims (Option A fix).""" + src = store.put_source(b"e") + store.put_claim(Claim(id="good", text="healthy claim kept", evidence=[src.id])) + store.put_claim(Claim(id="bad", text="will be corrupted", evidence=[src.id])) + + # Build a clean initial index so state.db exists. + health.rebuild_index(store) + assert index_db.stats(store.kb_dir)["claims"] == 2 + + # Corrupt one YAML file to simulate a truncated write. + (store.kb_dir / "claims" / "bad.yaml").write_text("{ broken: yaml: [") + + # rebuild_index must warn about the skipped file, not crash. + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + health.rebuild_index(store) + + assert any("bad.yaml" in str(w.message) or "bad" in str(w.message) for w in caught), ( + "expected a warning naming the skipped artifact" + ) + + # The healthy claim must still be searchable. + hits = index_db.search(store.kb_dir, "healthy") + assert any(k == "claim" and row_id == "good" for k, row_id, *_ in hits), ( + "healthy claim disappeared from index after corrupt-sibling rebuild" + ) + + +def test_doctor_detects_blown_index(store: KBStore) -> None: + """doctor() should warn when state.db is empty but artifact files exist.""" + src = store.put_source(b"e") + store.put_claim(Claim(id="c1", text="detectable claim", evidence=[src.id])) + + # Build the index so state.db exists, then wipe FTS rows to simulate the bug. + health.rebuild_index(store) + index_db.reset(store.kb_dir) + + report = health.doctor(store) + codes = {f.code for f in report.findings} + assert "index_blown" in codes