-
Notifications
You must be signed in to change notification settings - Fork 26
fix(health,index_db): rebuild_index atomic swap prevents index wipeou… #160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6e26f18
7f8f482
68c7ee9
53f1d7d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,14 +7,17 @@ | |
|
|
||
| from __future__ import annotations | ||
|
|
||
| import shutil | ||
| import tempfile | ||
| import warnings | ||
| from dataclasses import dataclass, field | ||
| from datetime import UTC, datetime, timedelta | ||
| from pathlib import Path | ||
|
|
||
| from . import index_db | ||
| from .audit import count_events | ||
| from .models import ClaimStatus, ProposalStatus | ||
| from .storage import KBStore, sha256_hex | ||
| from .models import Claim, ClaimStatus, Entity, Page, ProposalStatus | ||
| from .storage import KBStore, _deserialize_page, _yaml_load, sha256_hex | ||
| from .verify import verify_all | ||
|
|
||
|
|
||
|
|
@@ -140,14 +143,35 @@ def doctor(store: KBStore) -> HealthReport: | |
| "info", "index_missing", | ||
| "state.db not present — run `vouch index` to build it", | ||
| )) | ||
| else: | ||
| # Detect a blown index: DB exists but FTS is empty while artifact files | ||
| # are present on disk. This is the aftermath of a crashed rebuild_index. | ||
| stats = index_db.stats(store.kb_dir) | ||
| claims_on_disk = len(list((store.kb_dir / "claims").glob("*.yaml"))) | ||
| pages_on_disk = len(list((store.kb_dir / "pages").glob("*.md"))) | ||
| entities_on_disk = len(list((store.kb_dir / "entities").glob("*.yaml"))) | ||
| artifacts_on_disk = claims_on_disk + pages_on_disk + entities_on_disk | ||
| indexed = stats["claims"] + stats["pages"] + stats["entities"] | ||
| if artifacts_on_disk > 0 and indexed == 0: | ||
| report.findings.append(Finding( | ||
| "warning", "index_blown", | ||
| "state.db is empty but artifact files exist on disk — " | ||
| "run `vouch index` to rebuild the search index", | ||
| )) | ||
|
|
||
| report.ok = not any(f.severity == "error" for f in report.findings) | ||
| return report | ||
|
|
||
|
|
||
| def rebuild_index(store: KBStore) -> dict: | ||
| """Drop and rebuild state.db from the durable files. Idempotent.""" | ||
| # Detect a stale embedding-model identity before reset() wipes the meta. | ||
| """Rebuild state.db from durable files, atomically. | ||
|
|
||
| Writes into a temp file in the same directory (so the final rename is | ||
| atomic on any POSIX filesystem), then swaps it into place only on full | ||
| success. If anything raises — bad YAML, validation error, I/O — the | ||
| temp file is deleted and the existing index is left untouched. | ||
| """ | ||
| # Detect a stale embedding-model identity before we discard the old meta. | ||
| try: | ||
| from . import audit | ||
| from .embeddings.migration import detect_mismatch | ||
|
|
@@ -160,50 +184,113 @@ def rebuild_index(store: KBStore) -> dict: | |
| ) | ||
| except ImportError: | ||
| pass | ||
| index_db.reset(store.kb_dir) | ||
| with index_db.open_db(store.kb_dir) as conn: | ||
| for c in store.list_claims(): | ||
| index_db.index_claim( | ||
| conn, id=c.id, text=c.text, | ||
| type=c.type.value, status=c.status.value, tags=c.tags, | ||
| ) | ||
| for p in store.list_pages(): | ||
| index_db.index_page( | ||
| conn, id=p.id, title=p.title, body=p.body, | ||
| type=p.type.value, tags=p.tags, | ||
| ) | ||
| for e in store.list_entities(): | ||
| index_db.index_entity( | ||
| conn, id=e.id, name=e.name, description=e.description, | ||
| type=e.type.value, aliases=e.aliases, | ||
| ) | ||
| _rebuild_embeddings(store) | ||
|
|
||
| db_path = store.kb_dir / index_db.DB_FILENAME | ||
| store.kb_dir.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| with tempfile.NamedTemporaryFile( | ||
| dir=store.kb_dir, suffix=".db.tmp", delete=False, | ||
| ) as tmp: | ||
|
Comment on lines
+191
to
+193
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If the process is killed after creating this temp DB but before cleanup, the leftover file is named like Useful? React with 👍 / 👎. |
||
| tmp_path = Path(tmp.name) | ||
|
|
||
| skipped: list[tuple[str, str]] = [] | ||
| claims: list[Claim] = [] | ||
| pages: list[Page] = [] | ||
| entities: list[Entity] = [] | ||
| try: | ||
| with index_db.open_db_at(tmp_path) as conn: | ||
| for claim_path in sorted((store.kb_dir / "claims").glob("*.yaml")): | ||
| try: | ||
| c: Claim = Claim.model_validate( | ||
| _yaml_load(claim_path.read_text()), | ||
| ) | ||
| except Exception as exc: | ||
| skipped.append((claim_path.name, str(exc))) | ||
| continue | ||
|
Comment on lines
+207
to
+209
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If Useful? React with 👍 / 👎. |
||
| claims.append(c) | ||
| index_db.index_claim( | ||
| conn, id=c.id, text=c.text, | ||
| type=c.type.value, status=c.status.value, tags=c.tags, | ||
| ) | ||
| for page_path in sorted((store.kb_dir / "pages").glob("*.md")): | ||
| try: | ||
| pg: Page = _deserialize_page(page_path.read_text()) | ||
| except Exception as exc: | ||
| skipped.append((page_path.name, str(exc))) | ||
| continue | ||
| pages.append(pg) | ||
| index_db.index_page( | ||
| conn, id=pg.id, title=pg.title, body=pg.body, | ||
| type=pg.type.value, tags=pg.tags, | ||
| ) | ||
| for entity_path in sorted((store.kb_dir / "entities").glob("*.yaml")): | ||
| try: | ||
| e: Entity = Entity.model_validate( | ||
| _yaml_load(entity_path.read_text()), | ||
| ) | ||
| except Exception as exc: | ||
| skipped.append((entity_path.name, str(exc))) | ||
| continue | ||
| entities.append(e) | ||
| index_db.index_entity( | ||
| conn, id=e.id, name=e.name, description=e.description, | ||
| type=e.type.value, aliases=e.aliases, | ||
| ) | ||
| # Embeddings are built into the temp DB before the swap so the | ||
| # entire operation remains atomic — a failure here aborts the | ||
| # rename and the live index is left untouched. | ||
| _write_embeddings(conn, claims=claims, pages=pages, entities=entities) | ||
| shutil.move(str(tmp_path), str(db_path)) | ||
| except Exception: | ||
| tmp_path.unlink(missing_ok=True) | ||
| raise | ||
|
|
||
| if skipped: | ||
| warnings.warn( | ||
| f"rebuild_index skipped {len(skipped)} unreadable artifact(s): {skipped}", | ||
| stacklevel=2, | ||
| ) | ||
|
|
||
| return index_db.stats(store.kb_dir) | ||
|
greatjourney589 marked this conversation as resolved.
|
||
|
|
||
|
|
||
| def _rebuild_embeddings(store: KBStore) -> None: | ||
| def _write_embeddings( # type: ignore[no-untyped-def] | ||
| conn, | ||
| *, | ||
| claims: list[Claim], | ||
| pages: list[Page], | ||
| entities: list[Entity], | ||
| ) -> None: | ||
| """Write embedding vectors into *conn* (caller owns the connection/commit). | ||
|
|
||
| No-ops when the embeddings extra is not installed or no embedder is | ||
| registered. Other embedder startup failures propagate so rebuild_index | ||
| does not silently swap in an index with an empty embedding_index. | ||
| """ | ||
| try: | ||
| from .embeddings import get_embedder | ||
| except ImportError: | ||
| return | ||
| try: | ||
| embedder = get_embedder() | ||
| except Exception: | ||
| except (KeyError, ImportError): | ||
| return | ||
|
coderabbitai[bot] marked this conversation as resolved.
Comment on lines
275
to
+277
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
With an embedder registered, any non- Useful? React with 👍 / 👎. |
||
| texts: list[tuple[str, str, str]] = [] | ||
| for c in claims: | ||
| texts.append(("claim", c.id, c.text)) | ||
| for pg in pages: | ||
| texts.append(("page", pg.id, f"{pg.title} {pg.body}")) | ||
| for e in entities: | ||
| texts.append(("entity", e.id, f"{e.name} {e.description or ''}")) | ||
| if not texts: | ||
| return | ||
| with index_db.open_db(store.kb_dir) as conn: | ||
| texts: list[tuple[str, str, str]] = [] | ||
| for c in store.list_claims(): | ||
| texts.append(("claim", c.id, c.text)) | ||
| for p in store.list_pages(): | ||
| texts.append(("page", p.id, f"{p.title} {p.body}")) | ||
| for e in store.list_entities(): | ||
| texts.append(("entity", e.id, f"{e.name} {e.description or ''}")) | ||
| if not texts: | ||
| return | ||
| batch_size = 64 | ||
| for i in range(0, len(texts), batch_size): | ||
| batch = texts[i:i + batch_size] | ||
| vecs = embedder.encode_batch([t[2] for t in batch]) | ||
| for (kind, eid, _), row in zip(batch, vecs, strict=True): | ||
| index_db.index_embedding(conn, kind=kind, id=eid, | ||
| vec=row.tolist()) | ||
| batch_size = 64 | ||
| for i in range(0, len(texts), batch_size): | ||
| batch = texts[i:i + batch_size] | ||
| vecs = embedder.encode_batch([t[2] for t in batch]) | ||
| for (kind, eid, _), row in zip(batch, vecs, strict=True): | ||
| index_db.index_embedding(conn, kind=kind, id=eid, | ||
| vec=row.tolist()) | ||
|
|
||
|
|
||
| # --- helpers used by `vouch discover` (CLI) ------------------------------- | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When
state.dbexists but is corrupt or not a SQLite database (for example after an interrupted write or accidental overwrite), this newindex_db.stats()call raises asqlite3.DatabaseError, sovouch doctoraborts instead of returning a health report that can tell the user to rebuild the derived index. Since the index is explicitly treated as disposable, this path should catch stats/open failures and report a warning/error rather than crashing.Useful? React with 👍 / 👎.