From 6e26f18e3df102df80227b95e51b55f06366db3f Mon Sep 17 00:00:00 2001 From: greatjourney589 Date: Wed, 3 Jun 2026 19:21:03 -0400 Subject: [PATCH 1/4] fix(health,index_db): rebuild_index atomic swap prevents index wipeout on parse error --- src/vouch/health.py | 99 ++++++++++++++++++++++++++++++++++--------- src/vouch/index_db.py | 13 ++++++ tests/test_index.py | 51 ++++++++++++++++++++++ 3 files changed, 142 insertions(+), 21 deletions(-) diff --git a/src/vouch/health.py b/src/vouch/health.py index ac3fe510..7f650f30 100644 --- a/src/vouch/health.py +++ b/src/vouch/health.py @@ -7,14 +7,17 @@ from __future__ import annotations +import shutil +import tempfile +import warnings from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta from pathlib import Path from . import index_db from .audit import count_events -from .models import ClaimStatus, ProposalStatus -from .storage import KBStore, sha256_hex +from .models import Claim, ClaimStatus, Entity, Page, ProposalStatus +from .storage import KBStore, _deserialize_page, _yaml_load, sha256_hex from .verify import verify_all @@ -140,14 +143,35 @@ def doctor(store: KBStore) -> HealthReport: "info", "index_missing", "state.db not present — run `vouch index` to build it", )) + else: + # Detect a blown index: DB exists but FTS is empty while artifact files + # are present on disk. This is the aftermath of a crashed rebuild_index. + stats = index_db.stats(store.kb_dir) + claims_on_disk = len(list((store.kb_dir / "claims").glob("*.yaml"))) + pages_on_disk = len(list((store.kb_dir / "pages").glob("*.md"))) + entities_on_disk = len(list((store.kb_dir / "entities").glob("*.yaml"))) + artifacts_on_disk = claims_on_disk + pages_on_disk + entities_on_disk + indexed = stats["claims"] + stats["pages"] + stats["entities"] + if artifacts_on_disk > 0 and indexed == 0: + report.findings.append(Finding( + "warning", "index_blown", + "state.db is empty but artifact files exist on disk — " + "run `vouch index` to rebuild the search index", + )) report.ok = not any(f.severity == "error" for f in report.findings) return report def rebuild_index(store: KBStore) -> dict: - """Drop and rebuild state.db from the durable files. Idempotent.""" - # Detect a stale embedding-model identity before reset() wipes the meta. + """Rebuild state.db from durable files, atomically. + + Writes into a temp file in the same directory (so the final rename is + atomic on any POSIX filesystem), then swaps it into place only on full + success. If anything raises — bad YAML, validation error, I/O — the + temp file is deleted and the existing index is left untouched. + """ + # Detect a stale embedding-model identity before we discard the old meta. try: from . import audit from .embeddings.migration import detect_mismatch @@ -160,23 +184,56 @@ def rebuild_index(store: KBStore) -> dict: ) except ImportError: pass - index_db.reset(store.kb_dir) - with index_db.open_db(store.kb_dir) as conn: - for c in store.list_claims(): - index_db.index_claim( - conn, id=c.id, text=c.text, - type=c.type.value, status=c.status.value, tags=c.tags, - ) - for p in store.list_pages(): - index_db.index_page( - conn, id=p.id, title=p.title, body=p.body, - type=p.type.value, tags=p.tags, - ) - for e in store.list_entities(): - index_db.index_entity( - conn, id=e.id, name=e.name, description=e.description, - type=e.type.value, aliases=e.aliases, - ) + + db_path = store.kb_dir / index_db.DB_FILENAME + store.kb_dir.mkdir(parents=True, exist_ok=True) + + with tempfile.NamedTemporaryFile( + dir=store.kb_dir, suffix=".db.tmp", delete=False, + ) as tmp: + tmp_path = Path(tmp.name) + + skipped: list[tuple[str, str]] = [] + try: + with index_db.open_db_at(tmp_path) as conn: + for p in sorted((store.kb_dir / "claims").glob("*.yaml")): + try: + c: Claim = Claim.model_validate(_yaml_load(p.read_text())) + index_db.index_claim( + conn, id=c.id, text=c.text, + type=c.type.value, status=c.status.value, tags=c.tags, + ) + except Exception as exc: + skipped.append((p.name, str(exc))) + for p in sorted((store.kb_dir / "pages").glob("*.md")): + try: + pg: Page = _deserialize_page(p.read_text()) + index_db.index_page( + conn, id=pg.id, title=pg.title, body=pg.body, + type=pg.type.value, tags=pg.tags, + ) + except Exception as exc: + skipped.append((p.name, str(exc))) + for p in sorted((store.kb_dir / "entities").glob("*.yaml")): + try: + e: Entity = Entity.model_validate(_yaml_load(p.read_text())) + index_db.index_entity( + conn, id=e.id, name=e.name, description=e.description, + type=e.type.value, aliases=e.aliases, + ) + except Exception as exc: + skipped.append((p.name, str(exc))) + shutil.move(str(tmp_path), str(db_path)) + except Exception: + tmp_path.unlink(missing_ok=True) + raise + + if skipped: + warnings.warn( + f"rebuild_index skipped {len(skipped)} unreadable artifact(s): {skipped}", + stacklevel=2, + ) + _rebuild_embeddings(store) return index_db.stats(store.kb_dir) diff --git a/src/vouch/index_db.py b/src/vouch/index_db.py index 7d026273..40a13987 100644 --- a/src/vouch/index_db.py +++ b/src/vouch/index_db.py @@ -95,6 +95,19 @@ def open_db(kb_dir: Path): conn.close() +@contextmanager +def open_db_at(path: Path): + """Like open_db but accepts an explicit path — used for atomic temp-file rebuilds.""" + path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(path)) + try: + conn.executescript(SCHEMA) + yield conn + conn.commit() + finally: + conn.close() + + def reset(kb_dir: Path) -> None: """Drop everything; the rebuild caller re-populates. diff --git a/tests/test_index.py b/tests/test_index.py index e962dea0..fa01a346 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -2,6 +2,7 @@ from __future__ import annotations +import warnings from pathlib import Path import pytest @@ -47,3 +48,53 @@ def test_index_via_approve(store: KBStore) -> None: approve(store, pr.id, approved_by="u") hits = index_db.search(store.kb_dir, "indexed") assert any(k == "claim" for k, *_ in hits) + + +def test_rebuild_index_preserves_good_claims_when_one_yaml_is_corrupt( + store: KBStore, +) -> None: + """A bad YAML must not destroy the index for healthy claims (Option A fix).""" + src = store.put_source(b"e") + store.put_claim(Claim(id="good", text="healthy claim kept", evidence=[src.id])) + store.put_claim(Claim(id="bad", text="will be corrupted", evidence=[src.id])) + + # Build a clean initial index so state.db exists. + health.rebuild_index(store) + assert index_db.stats(store.kb_dir)["claims"] == 2 + + # Corrupt one YAML file to simulate a truncated write. + (store.kb_dir / "claims" / "bad.yaml").write_text("{ broken: yaml: [") + + # rebuild_index must warn about the skipped file, not crash. + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + health.rebuild_index(store) + + assert any("bad.yaml" in str(w.message) or "bad" in str(w.message) for w in caught), ( + "expected a warning naming the skipped artifact" + ) + + # The healthy claim must still be searchable. + hits = index_db.search(store.kb_dir, "healthy") + assert any(k == "claim" and row_id == "good" for k, row_id, *_ in hits), ( + "healthy claim disappeared from index after corrupt-sibling rebuild" + ) + + +def test_doctor_detects_blown_index(store: KBStore) -> None: + """doctor() should warn when state.db is empty but artifact files exist.""" + src = store.put_source(b"e") + store.put_claim(Claim(id="c1", text="detectable claim", evidence=[src.id])) + + # Build the index so state.db exists, then wipe it to simulate the bug. + health.rebuild_index(store) + with index_db.open_db(store.kb_dir) as conn: + conn.executescript( + "DELETE FROM claims_fts;" + "DELETE FROM pages_fts;" + "DELETE FROM entities_fts;" + ) + + report = health.doctor(store) + codes = {f.code for f in report.findings} + assert "index_blown" in codes From 7f8f4825b772cd515ed9709acc3497838f55dc64 Mon Sep 17 00:00:00 2001 From: greatjourney589 Date: Wed, 3 Jun 2026 19:42:05 -0400 Subject: [PATCH 2/4] fix(health): narrow per-artifact rescue and extend atomicity to embeddings --- src/vouch/health.py | 74 +++++++++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 30 deletions(-) diff --git a/src/vouch/health.py b/src/vouch/health.py index 7f650f30..feb5cc2e 100644 --- a/src/vouch/health.py +++ b/src/vouch/health.py @@ -199,30 +199,37 @@ def rebuild_index(store: KBStore) -> dict: for p in sorted((store.kb_dir / "claims").glob("*.yaml")): try: c: Claim = Claim.model_validate(_yaml_load(p.read_text())) - index_db.index_claim( - conn, id=c.id, text=c.text, - type=c.type.value, status=c.status.value, tags=c.tags, - ) except Exception as exc: skipped.append((p.name, str(exc))) + continue + index_db.index_claim( + conn, id=c.id, text=c.text, + type=c.type.value, status=c.status.value, tags=c.tags, + ) for p in sorted((store.kb_dir / "pages").glob("*.md")): try: pg: Page = _deserialize_page(p.read_text()) - index_db.index_page( - conn, id=pg.id, title=pg.title, body=pg.body, - type=pg.type.value, tags=pg.tags, - ) except Exception as exc: skipped.append((p.name, str(exc))) + continue + index_db.index_page( + conn, id=pg.id, title=pg.title, body=pg.body, + type=pg.type.value, tags=pg.tags, + ) for p in sorted((store.kb_dir / "entities").glob("*.yaml")): try: e: Entity = Entity.model_validate(_yaml_load(p.read_text())) - index_db.index_entity( - conn, id=e.id, name=e.name, description=e.description, - type=e.type.value, aliases=e.aliases, - ) except Exception as exc: skipped.append((p.name, str(exc))) + continue + index_db.index_entity( + conn, id=e.id, name=e.name, description=e.description, + type=e.type.value, aliases=e.aliases, + ) + # Embeddings are built into the temp DB before the swap so the + # entire operation remains atomic — a failure here aborts the + # rename and the live index is left untouched. + _write_embeddings(conn, store) shutil.move(str(tmp_path), str(db_path)) except Exception: tmp_path.unlink(missing_ok=True) @@ -234,33 +241,40 @@ def rebuild_index(store: KBStore) -> dict: stacklevel=2, ) - _rebuild_embeddings(store) return index_db.stats(store.kb_dir) -def _rebuild_embeddings(store: KBStore) -> None: +def _write_embeddings(conn, store: KBStore) -> None: # type: ignore[no-untyped-def] + """Write embedding vectors into *conn* (caller owns the connection/commit). + + No-ops silently if no embedder is registered or its deps are absent. + """ try: from .embeddings import get_embedder embedder = get_embedder() except Exception: return + texts: list[tuple[str, str, str]] = [] + for c in store.list_claims(): + texts.append(("claim", c.id, c.text)) + for p in store.list_pages(): + texts.append(("page", p.id, f"{p.title} {p.body}")) + for e in store.list_entities(): + texts.append(("entity", e.id, f"{e.name} {e.description or ''}")) + if not texts: + return + batch_size = 64 + for i in range(0, len(texts), batch_size): + batch = texts[i:i + batch_size] + vecs = embedder.encode_batch([t[2] for t in batch]) + for (kind, eid, _), row in zip(batch, vecs, strict=True): + index_db.index_embedding(conn, kind=kind, id=eid, + vec=row.tolist()) + + +def _rebuild_embeddings(store: KBStore) -> None: with index_db.open_db(store.kb_dir) as conn: - texts: list[tuple[str, str, str]] = [] - for c in store.list_claims(): - texts.append(("claim", c.id, c.text)) - for p in store.list_pages(): - texts.append(("page", p.id, f"{p.title} {p.body}")) - for e in store.list_entities(): - texts.append(("entity", e.id, f"{e.name} {e.description or ''}")) - if not texts: - return - batch_size = 64 - for i in range(0, len(texts), batch_size): - batch = texts[i:i + batch_size] - vecs = embedder.encode_batch([t[2] for t in batch]) - for (kind, eid, _), row in zip(batch, vecs, strict=True): - index_db.index_embedding(conn, kind=kind, id=eid, - vec=row.tolist()) + _write_embeddings(conn, store) # --- helpers used by `vouch discover` (CLI) ------------------------------- From 68c7ee9a319ccae741d8eeba3cb31c00b7633845 Mon Sep 17 00:00:00 2001 From: greatjourney589 Date: Tue, 9 Jun 2026 03:19:53 -0400 Subject: [PATCH 3/4] fix(health): remove dead _rebuild_embeddings and reuse parsed artifacts --- src/vouch/health.py | 51 +++++++++++++++++++++++++++------------------ tests/test_index.py | 9 ++------ 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/src/vouch/health.py b/src/vouch/health.py index feb5cc2e..3049e0ee 100644 --- a/src/vouch/health.py +++ b/src/vouch/health.py @@ -194,34 +194,44 @@ def rebuild_index(store: KBStore) -> dict: tmp_path = Path(tmp.name) skipped: list[tuple[str, str]] = [] + claims: list[Claim] = [] + pages: list[Page] = [] + entities: list[Entity] = [] try: with index_db.open_db_at(tmp_path) as conn: - for p in sorted((store.kb_dir / "claims").glob("*.yaml")): + for claim_path in sorted((store.kb_dir / "claims").glob("*.yaml")): try: - c: Claim = Claim.model_validate(_yaml_load(p.read_text())) + c: Claim = Claim.model_validate( + _yaml_load(claim_path.read_text()), + ) except Exception as exc: - skipped.append((p.name, str(exc))) + skipped.append((claim_path.name, str(exc))) continue + claims.append(c) index_db.index_claim( conn, id=c.id, text=c.text, type=c.type.value, status=c.status.value, tags=c.tags, ) - for p in sorted((store.kb_dir / "pages").glob("*.md")): + for page_path in sorted((store.kb_dir / "pages").glob("*.md")): try: - pg: Page = _deserialize_page(p.read_text()) + pg: Page = _deserialize_page(page_path.read_text()) except Exception as exc: - skipped.append((p.name, str(exc))) + skipped.append((page_path.name, str(exc))) continue + pages.append(pg) index_db.index_page( conn, id=pg.id, title=pg.title, body=pg.body, type=pg.type.value, tags=pg.tags, ) - for p in sorted((store.kb_dir / "entities").glob("*.yaml")): + for entity_path in sorted((store.kb_dir / "entities").glob("*.yaml")): try: - e: Entity = Entity.model_validate(_yaml_load(p.read_text())) + e: Entity = Entity.model_validate( + _yaml_load(entity_path.read_text()), + ) except Exception as exc: - skipped.append((p.name, str(exc))) + skipped.append((entity_path.name, str(exc))) continue + entities.append(e) index_db.index_entity( conn, id=e.id, name=e.name, description=e.description, type=e.type.value, aliases=e.aliases, @@ -229,7 +239,7 @@ def rebuild_index(store: KBStore) -> dict: # Embeddings are built into the temp DB before the swap so the # entire operation remains atomic — a failure here aborts the # rename and the live index is left untouched. - _write_embeddings(conn, store) + _write_embeddings(conn, claims=claims, pages=pages, entities=entities) shutil.move(str(tmp_path), str(db_path)) except Exception: tmp_path.unlink(missing_ok=True) @@ -244,7 +254,13 @@ def rebuild_index(store: KBStore) -> dict: return index_db.stats(store.kb_dir) -def _write_embeddings(conn, store: KBStore) -> None: # type: ignore[no-untyped-def] +def _write_embeddings( # type: ignore[no-untyped-def] + conn, + *, + claims: list[Claim], + pages: list[Page], + entities: list[Entity], +) -> None: """Write embedding vectors into *conn* (caller owns the connection/commit). No-ops silently if no embedder is registered or its deps are absent. @@ -255,11 +271,11 @@ def _write_embeddings(conn, store: KBStore) -> None: # type: ignore[no-untyped- except Exception: return texts: list[tuple[str, str, str]] = [] - for c in store.list_claims(): + for c in claims: texts.append(("claim", c.id, c.text)) - for p in store.list_pages(): - texts.append(("page", p.id, f"{p.title} {p.body}")) - for e in store.list_entities(): + for pg in pages: + texts.append(("page", pg.id, f"{pg.title} {pg.body}")) + for e in entities: texts.append(("entity", e.id, f"{e.name} {e.description or ''}")) if not texts: return @@ -272,11 +288,6 @@ def _write_embeddings(conn, store: KBStore) -> None: # type: ignore[no-untyped- vec=row.tolist()) -def _rebuild_embeddings(store: KBStore) -> None: - with index_db.open_db(store.kb_dir) as conn: - _write_embeddings(conn, store) - - # --- helpers used by `vouch discover` (CLI) ------------------------------- def hash_path(p: Path) -> str: diff --git a/tests/test_index.py b/tests/test_index.py index fa01a346..3c92bede 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -86,14 +86,9 @@ def test_doctor_detects_blown_index(store: KBStore) -> None: src = store.put_source(b"e") store.put_claim(Claim(id="c1", text="detectable claim", evidence=[src.id])) - # Build the index so state.db exists, then wipe it to simulate the bug. + # Build the index so state.db exists, then wipe FTS rows to simulate the bug. health.rebuild_index(store) - with index_db.open_db(store.kb_dir) as conn: - conn.executescript( - "DELETE FROM claims_fts;" - "DELETE FROM pages_fts;" - "DELETE FROM entities_fts;" - ) + index_db.reset(store.kb_dir) report = health.doctor(store) codes = {f.code for f in report.findings} From 53f1d7dac590904a2fea1934159038950019e543 Mon Sep 17 00:00:00 2001 From: greatjourney589 Date: Tue, 9 Jun 2026 03:34:31 -0400 Subject: [PATCH 4/4] fix(health): only skip embeddings when embedder is unavailable --- src/vouch/health.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/vouch/health.py b/src/vouch/health.py index 3049e0ee..17b6292c 100644 --- a/src/vouch/health.py +++ b/src/vouch/health.py @@ -263,12 +263,17 @@ def _write_embeddings( # type: ignore[no-untyped-def] ) -> None: """Write embedding vectors into *conn* (caller owns the connection/commit). - No-ops silently if no embedder is registered or its deps are absent. + No-ops when the embeddings extra is not installed or no embedder is + registered. Other embedder startup failures propagate so rebuild_index + does not silently swap in an index with an empty embedding_index. """ try: from .embeddings import get_embedder + except ImportError: + return + try: embedder = get_embedder() - except Exception: + except (KeyError, ImportError): return texts: list[tuple[str, str, str]] = [] for c in claims: