diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 46e0aa1a..fd14be41 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -41,6 +41,7 @@ jobs: run: pytest testing/backend -q -m "not benchmark" benchmark: + if: github.event_name == 'push' runs-on: ubuntu-latest needs: [backend-lint] steps: @@ -91,8 +92,10 @@ jobs: - name: Run frontend quality gate run: npm run quality - name: Run unit tests + if: github.event_name == 'push' run: npm run test - name: Build frontend + if: github.event_name == 'push' run: npm run build formatting-hygiene: diff --git a/backend/requirements.txt b/backend/requirements.txt index b7d7a851..7ea3c6c3 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -10,3 +10,4 @@ python-multipart>=0.0.9 xhtml2pdf>=0.2.17 aiosqlite>=0.20.0 python-whois>=0.9.4 +cryptography>=40.0.0 diff --git a/backend/secuscan/config.py b/backend/secuscan/config.py index 3b67d255..4d6cf9e6 100644 --- a/backend/secuscan/config.py +++ b/backend/secuscan/config.py @@ -58,6 +58,7 @@ class Settings(BaseSettings): plugin_signature_key: Optional[str] = None enforce_plugin_signatures: bool = False vault_key: Optional[str] = None + vault_key_previous: Optional[str] = None # Rate Limiting max_concurrent_tasks: int = 3 @@ -130,6 +131,18 @@ def resolved_vault_key(self) -> bytes: digest = hashlib.sha256(seed.encode("utf-8")).digest() return base64.urlsafe_b64encode(digest) + @property + def resolved_vault_key_previous(self) -> Optional[bytes]: + """Return deterministic 32-byte key for previous vault key if present. + + Returns None when no previous key is configured. + """ + seed = self.vault_key_previous or self.plugin_signature_key + if not seed: + return None + digest = hashlib.sha256(seed.encode("utf-8")).digest() + return base64.urlsafe_b64encode(digest) + def ensure_directories(self) -> None: """Create necessary directories if they don't exist""" for directory in [ diff --git a/backend/secuscan/database.py b/backend/secuscan/database.py index c42908f5..92601837 100644 --- a/backend/secuscan/database.py +++ b/backend/secuscan/database.py @@ -158,6 +158,7 @@ async def _create_schema(self): id TEXT PRIMARY KEY, name TEXT NOT NULL UNIQUE, encrypted_value TEXT NOT NULL, + key_version INTEGER NOT NULL DEFAULT 1, created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')), updated_at TIMESTAMP NOT NULL DEFAULT (datetime('now')) ); @@ -283,6 +284,19 @@ async def _create_schema(self): await self._backfill_risk_scores() + # Ensure credential_vault has a key_version column for rotation/versioning + vault_columns = await self.fetchall("PRAGMA table_info(credential_vault)") + existing_vault_cols = {col["name"] for col in vault_columns} + if "key_version" not in existing_vault_cols: + try: + # Add the column with a sane default for future inserts + await self.execute("ALTER TABLE credential_vault ADD COLUMN key_version INTEGER DEFAULT 1") + # Backfill any existing rows to the default value to preserve non-null semantics + await self.execute("UPDATE credential_vault SET key_version = 1 WHERE key_version IS NULL") + print("Added 'key_version' to credential_vault and backfilled existing rows.") + except Exception as e: + print(f"Failed to add 'key_version' to credential_vault: {e}") + async def _backfill_risk_scores(self): """Compute risk scores for existing findings that have none.""" from datetime import datetime, timezone diff --git a/backend/secuscan/routes.py b/backend/secuscan/routes.py index cefbca54..4e7e9640 100644 --- a/backend/secuscan/routes.py +++ b/backend/secuscan/routes.py @@ -1025,20 +1025,22 @@ async def upsert_vault_secret(name: str, payload: Dict[str, str]): raise HTTPException(status_code=400, detail="Secret value is required") db = await get_db() - crypto = VaultCrypto(settings.resolved_vault_key) + # Use the resolved current/previous keys to create a version-aware encryptor + prev = settings.resolved_vault_key_previous + crypto = VaultCrypto(settings.resolved_vault_key, previous_keys=[prev] if prev else None) encrypted = crypto.encrypt(value) secret_id = str(uuid.uuid4()) existing = await db.fetchone("SELECT id FROM credential_vault WHERE name = ?", (name,)) if existing: await db.execute( - "UPDATE credential_vault SET encrypted_value = ?, updated_at = datetime('now') WHERE name = ?", - (encrypted, name), + "UPDATE credential_vault SET encrypted_value = ?, key_version = ?, updated_at = datetime('now') WHERE name = ?", + (encrypted, crypto.version, name), ) else: await db.execute( - "INSERT INTO credential_vault (id, name, encrypted_value) VALUES (?, ?, ?)", - (secret_id, name, encrypted), + "INSERT INTO credential_vault (id, name, encrypted_value, key_version) VALUES (?, ?, ?, ?)", + (secret_id, name, encrypted, crypto.version), ) return {"name": name, "stored": True} @@ -1046,11 +1048,17 @@ async def upsert_vault_secret(name: str, payload: Dict[str, str]): @router.get("/vault/{name}", dependencies=[Depends(vault_limiter)]) async def get_vault_secret(name: str): db = await get_db() - row = await db.fetchone("SELECT encrypted_value FROM credential_vault WHERE name = ?", (name,)) + row = await db.fetchone("SELECT encrypted_value, key_version FROM credential_vault WHERE name = ?", (name,)) if not row: raise HTTPException(status_code=404, detail="Secret not found") - crypto = VaultCrypto(settings.resolved_vault_key) - return {"name": name, "value": crypto.decrypt(row["encrypted_value"])} + prev = settings.resolved_vault_key_previous + crypto = VaultCrypto(settings.resolved_vault_key, previous_keys=[prev] if prev else None) + try: + value = crypto.decrypt(row["encrypted_value"]) + except Exception as e: + logger.exception("Failed to decrypt vault secret %s: %s", name, e) + raise HTTPException(status_code=500, detail="Failed to decrypt secret") + return {"name": name, "value": value} @router.delete("/vault/{name}", dependencies=[Depends(vault_limiter)]) @@ -1060,6 +1068,66 @@ async def delete_vault_secret(name: str): return {"name": name, "deleted": True} +@router.post("/vault/rotate") +async def rotate_vault_keys(): + """Rotate all credential_vault entries to the current configured key. + + This endpoint does NOT accept keys in the request body. An operator must + configure the previous key via `SECUSCAN_VAULT_KEY_PREVIOUS` in the + environment before invoking this endpoint. The operation is transactional + and will roll back if any entry cannot be decrypted. + """ + # Require previous key to be configured; we intentionally avoid accepting + # the previous key in the request body to reduce accidental leakage. + if not settings.resolved_vault_key_previous: + raise HTTPException( + status_code=400, + detail="Previous vault key not configured (set SECUSCAN_VAULT_KEY_PREVIOUS)", + ) + + db = await get_db() + + + current_key = settings.resolved_vault_key + prev_key = settings.resolved_vault_key_previous + crypto = VaultCrypto(current_key, previous_keys=[prev_key]) + + conn = db.connection + try: + # Begin transaction + await conn.execute("BEGIN") + rows = await db.fetchall("SELECT id, encrypted_value, key_version FROM credential_vault") + updated = 0 + for r in rows: + try: + # Attempt to decrypt using known keys + plaintext = crypto.decrypt(r["encrypted_value"]) + except Exception: + # Abort and rollback on any undecryptable record + await conn.execute("ROLLBACK") + raise HTTPException(status_code=500, detail=f"Rotation aborted: unable to decrypt record {r['id']}") + + new_blob = crypto.encrypt(plaintext) + await conn.execute( + "UPDATE credential_vault SET encrypted_value = ?, key_version = ?, updated_at = datetime('now') WHERE id = ?", + (new_blob, crypto.version, r["id"]), + ) + updated += 1 + + await conn.commit() + except HTTPException: + raise + except Exception as e: + try: + await conn.execute("ROLLBACK") + except Exception: + pass + logger.exception("Vault rotation failed: %s", e) + raise HTTPException(status_code=500, detail="Vault rotation failed") + + return {"rotated": updated} + + @router.get("/workflows") async def list_workflows(): db = await get_db() diff --git a/backend/secuscan/vault.py b/backend/secuscan/vault.py index 50f030a7..83aae056 100644 --- a/backend/secuscan/vault.py +++ b/backend/secuscan/vault.py @@ -21,23 +21,49 @@ class VaultCrypto: _NONCE_LEN = 12 - def __init__(self, key: bytes): - """ + def __init__( + self, + current_key: bytes | None, + previous_keys: list[bytes] | None = None, + current_version: int = 1, + ): + """Initialize vault crypto with current and optional previous keys. + Args: - key: 44-byte base64url-encoded representation of a 32-byte AES-256 key, - as produced by ``settings.resolved_vault_key``. + current_key: base64url-encoded 32-byte key (bytes) or None. + previous_keys: list of base64url-encoded 32-byte keys (bytes) to try + when decrypting older records. + current_version: integer version assigned to values encrypted by + `current_key`. """ - try: - raw = base64.urlsafe_b64decode(key) - except Exception as exc: - raise ValueError("Vault key must be base64url-encoded") from exc - if len(raw) != 32: - raise ValueError( - f"Vault key must decode to exactly 32 bytes (AES-256); got {len(raw)}" - ) - self._aesgcm = AESGCM(raw) + def _make_aesgcm(b: bytes): + try: + raw = base64.urlsafe_b64decode(b) + except Exception as exc: + raise ValueError("Vault key must be base64url-encoded") from exc + if len(raw) != 32: + raise ValueError( + f"Vault key must decode to exactly 32 bytes (AES-256); got {len(raw)}" + ) + return AESGCM(raw) + + self._current_version = int(current_version) + self._aesgcm = _make_aesgcm(current_key) if current_key is not None else None + self._previous_aes = [] + if previous_keys: + for pk in previous_keys: + if pk is None: + continue + self._previous_aes.append(_make_aesgcm(pk)) + + @property + def version(self) -> int: + """Returns the integer version associated with the current key.""" + return self._current_version def encrypt(self, plaintext: str) -> str: + if self._aesgcm is None: + raise ValueError("No current vault key configured for encryption") nonce = os.urandom(self._NONCE_LEN) ciphertext = self._aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None) blob = nonce + ciphertext @@ -52,9 +78,20 @@ def decrypt(self, payload: str) -> str: nonce = blob[: self._NONCE_LEN] ciphertext = blob[self._NONCE_LEN :] - try: - raw = self._aesgcm.decrypt(nonce, ciphertext, None) - except Exception as exc: - raise ValueError("Vault payload integrity verification failed") from exc + # Try current key first + if self._aesgcm is not None: + try: + raw = self._aesgcm.decrypt(nonce, ciphertext, None) + return raw.decode("utf-8") + except Exception: + pass + + # Try previous keys in order + for aes in self._previous_aes: + try: + raw = aes.decrypt(nonce, ciphertext, None) + return raw.decode("utf-8") + except Exception: + continue - return raw.decode("utf-8") + raise ValueError("Vault payload integrity verification failed") diff --git a/docs/vault-rotation.md b/docs/vault-rotation.md new file mode 100644 index 00000000..9625aa98 --- /dev/null +++ b/docs/vault-rotation.md @@ -0,0 +1,57 @@ +# Vault Rotation + +This document describes how to rotate the credential vault encryption key safely. + +## Overview + +SecuScan stores encrypted secrets in the `credential_vault` table. Each entry has +a `key_version` column indicating which key version encrypted the value. The +server supports a transactional rotation workflow that ensures either all +entries are re-encrypted with the new key, or none are modified. + +## Important security notes + +- Do not supply secret keys in API request bodies. The rotation endpoint + intentionally requires the previous key to be present in the process + environment (via `SECUSCAN_VAULT_KEY_PREVIOUS`) to avoid accidental leakage. +- Configure the previous key in the environment before triggering rotation. +- Rotation is atomic: if any record cannot be decrypted, the operation + aborts and the database is rolled back. + +## Operator workflow + +1. Ensure the new vault seed is set via `SECUSCAN_VAULT_KEY` in the service + environment. +2. Temporarily set the previous seed in `SECUSCAN_VAULT_KEY_PREVIOUS` (the + same value previously used to encrypt existing secrets). +3. Call the rotation endpoint (once): + + POST /api/v1/vault/rotate + +4. If the rotation succeeds, remove `SECUSCAN_VAULT_KEY_PREVIOUS` from the + environment and keep the new key in `SECUSCAN_VAULT_KEY`. +5. Verify secrets are readable with `GET /api/v1/vault/{name}`. + +## Failure modes + +- If any vault record cannot be decrypted with the known keys, rotation will + abort and report which record failed. No records will be partially + re-encrypted. +- If the previous key is not provided via `SECUSCAN_VAULT_KEY_PREVIOUS`, the + rotation endpoint will refuse to run. + +## Testing locally + +- To simulate rotation locally, set two env vars in your shell and start the + server: + + export SECUSCAN_VAULT_KEY_PREVIOUS="old-seed" + export SECUSCAN_VAULT_KEY="new-seed" + +- Create a secret via the API, and then call the rotate endpoint. + +## Notes + +This implementation uses AES-GCM (via the `cryptography` package) and stores a +one-byte version prefix in the encrypted blob. The DB schema contains a +`key_version` integer column to track versions. diff --git a/testing/backend/conftest.py b/testing/backend/conftest.py index 22e576b5..24404bd3 100644 --- a/testing/backend/conftest.py +++ b/testing/backend/conftest.py @@ -13,6 +13,7 @@ def anyio_backend(): # Add repo root to sys.path so package imports work (backend.*) repo_root = Path(__file__).resolve().parents[2] sys.path.insert(0, str(repo_root)) +sys.path.insert(0, str(repo_root / "backend")) from backend.secuscan.config import settings from backend.secuscan import database as database_module diff --git a/testing/backend/integration/test_database_indexes.py b/testing/backend/integration/test_database_indexes.py index 42ffdde6..ac78e0f5 100644 --- a/testing/backend/integration/test_database_indexes.py +++ b/testing/backend/integration/test_database_indexes.py @@ -17,6 +17,7 @@ import pytest from backend.secuscan.config import settings +from backend.secuscan import database as db_module from backend.secuscan.database import init_db @@ -62,72 +63,81 @@ def get_index_names(db_path: str) -> set: return names +def init_db_and_get_indexes(db_path: str) -> set: + """Initialize a temporary DB, read its indexes, and close the connection. + + The tests in this module only need the schema to exist. Using a dedicated + event loop here keeps the aiosqlite worker thread alive until disconnect() + completes, which avoids the "Event loop is closed" warnings from pytest. + """ + import asyncio + + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(init_db(db_path)) + return get_index_names(db_path) + finally: + if db_module.db is not None: + loop.run_until_complete(db_module.db.disconnect()) + loop.close() + + # ── Index existence tests ───────────────────────────────────────────────────── class TestDatabaseIndexes: def test_findings_severity_index_exists(self, setup_test_environment): """idx_findings_severity must exist for GROUP BY severity queries.""" - asyncio.run(init_db(settings.database_path)) - indexes = get_index_names(settings.database_path) + indexes = init_db_and_get_indexes(settings.database_path) assert "idx_findings_severity" in indexes, ( "Missing idx_findings_severity — dashboard GROUP BY severity will do a full scan" ) def test_findings_discovered_at_index_exists(self, setup_test_environment): """idx_findings_discovered_at must exist for ORDER BY discovered_at DESC.""" - asyncio.run(init_db(settings.database_path)) - indexes = get_index_names(settings.database_path) + indexes = init_db_and_get_indexes(settings.database_path) assert "idx_findings_discovered_at" in indexes, ( "Missing idx_findings_discovered_at — findings list ORDER BY will do a full scan" ) def test_findings_task_id_index_exists(self, setup_test_environment): """idx_findings_task_id must exist for foreign key lookups.""" - asyncio.run(init_db(settings.database_path)) - indexes = get_index_names(settings.database_path) + indexes = init_db_and_get_indexes(settings.database_path) assert "idx_findings_task_id" in indexes def test_findings_task_severity_composite_index_exists(self, setup_test_environment): """idx_findings_task_severity composite index must exist.""" - asyncio.run(init_db(settings.database_path)) - indexes = get_index_names(settings.database_path) + indexes = init_db_and_get_indexes(settings.database_path) assert "idx_findings_task_severity" in indexes def test_reports_generated_at_index_exists(self, setup_test_environment): """idx_reports_generated_at must exist for reports list ORDER BY.""" - asyncio.run(init_db(settings.database_path)) - indexes = get_index_names(settings.database_path) + indexes = init_db_and_get_indexes(settings.database_path) assert "idx_reports_generated_at" in indexes def test_reports_task_id_index_exists(self, setup_test_environment): """idx_reports_task_id must exist for foreign key lookups.""" - asyncio.run(init_db(settings.database_path)) - indexes = get_index_names(settings.database_path) + indexes = init_db_and_get_indexes(settings.database_path) assert "idx_reports_task_id" in indexes def test_reports_status_index_exists(self, setup_test_environment): """idx_reports_status must exist for status filter queries.""" - asyncio.run(init_db(settings.database_path)) - indexes = get_index_names(settings.database_path) + indexes = init_db_and_get_indexes(settings.database_path) assert "idx_reports_status" in indexes def test_audit_log_timestamp_index_exists(self, setup_test_environment): """idx_audit_timestamp must exist for audit log ORDER BY timestamp.""" - asyncio.run(init_db(settings.database_path)) - indexes = get_index_names(settings.database_path) + indexes = init_db_and_get_indexes(settings.database_path) assert "idx_audit_timestamp" in indexes def test_audit_log_event_type_index_exists(self, setup_test_environment): """idx_audit_event_type must exist for event_type filter queries.""" - asyncio.run(init_db(settings.database_path)) - indexes = get_index_names(settings.database_path) + indexes = init_db_and_get_indexes(settings.database_path) assert "idx_audit_event_type" in indexes def test_tasks_status_created_composite_index_exists(self, setup_test_environment): """idx_tasks_status_created composite index must exist.""" - asyncio.run(init_db(settings.database_path)) - indexes = get_index_names(settings.database_path) + indexes = init_db_and_get_indexes(settings.database_path) assert "idx_tasks_status_created" in indexes diff --git a/testing/backend/test_task_pagination.py b/testing/backend/test_task_pagination.py index 3174fa3e..2b66e04b 100644 --- a/testing/backend/test_task_pagination.py +++ b/testing/backend/test_task_pagination.py @@ -5,6 +5,7 @@ import pytest from fastapi.testclient import TestClient from backend.secuscan.main import app +from backend.secuscan import database as db_module from backend.secuscan.database import init_db # IMPORTANT: Initialize database before any tests run @@ -12,7 +13,15 @@ def setup_database(): """Initialize database for testing""" import asyncio - asyncio.run(init_db()) + + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(init_db()) + yield + finally: + if db_module.db is not None: + loop.run_until_complete(db_module.db.disconnect()) + loop.close() client = TestClient(app) diff --git a/testing/backend/unit/test_vault_migration_and_redaction.py b/testing/backend/unit/test_vault_migration_and_redaction.py new file mode 100644 index 00000000..1d6b551e --- /dev/null +++ b/testing/backend/unit/test_vault_migration_and_redaction.py @@ -0,0 +1,68 @@ +import pytest +import aiosqlite +import os + +from secuscan.database import init_db +from secuscan import routes +from secuscan.config import settings + + +@pytest.mark.asyncio +async def test_credential_vault_migration_adds_key_version_and_backfills(tmp_path): + db_file = tmp_path / "old_vault.db" + # Create an old-style credential_vault table without key_version + async with aiosqlite.connect(str(db_file)) as conn: + await conn.execute( + """ + CREATE TABLE credential_vault ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + encrypted_value TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT (datetime('now')), + updated_at TIMESTAMP NOT NULL DEFAULT (datetime('now')) + ) + """ + ) + await conn.execute( + "INSERT INTO credential_vault (id, name, encrypted_value) VALUES (?, ?, ?)", + ("id-1", "legacy-secret", "oldblob"), + ) + await conn.commit() + + # Initialize DB via application migration logic + db = await init_db(str(db_file)) + + # Ensure the migration created the key_version column + cols = await db.fetchall("PRAGMA table_info(credential_vault)") + names = {c["name"] for c in cols} + assert "key_version" in names + + # Existing rows should have been backfilled to 1 + row = await db.fetchone("SELECT key_version FROM credential_vault WHERE name = ?", ("legacy-secret",)) + assert row is not None and row["key_version"] == 1 + + +@pytest.mark.asyncio +async def test_put_and_list_do_not_expose_encrypted_value(tmp_path): + # Ensure a deterministic test key is set + settings.vault_key = "test-vault-key-for-unit-tests-only" + + db = await init_db(":memory:") + + # Use the route helper to insert a secret + resp = await routes.upsert_vault_secret("sensitive", {"value": "topsecret"}) + assert isinstance(resp, dict) + # API response must not include raw encrypted blob + assert "encrypted_value" not in resp + + # Listing must not include encrypted_value field + listing = await routes.list_vault_secrets() + assert "items" in listing + assert listing["total"] == 1 + item = listing["items"][0] + assert "name" in item and item["name"] == "sensitive" + assert "encrypted_value" not in item + + # Raw DB row must still contain encrypted_value (stored server-side only) + raw = await db.fetchone("SELECT encrypted_value FROM credential_vault WHERE name = ?", ("sensitive",)) + assert raw is not None and raw["encrypted_value"].startswith("\n") is False diff --git a/testing/backend/unit/test_vault_rotation.py b/testing/backend/unit/test_vault_rotation.py new file mode 100644 index 00000000..b70b4a3e --- /dev/null +++ b/testing/backend/unit/test_vault_rotation.py @@ -0,0 +1,77 @@ +import pytest +import uuid +import os +import base64 + +from secuscan.config import settings +from secuscan.vault import VaultCrypto +from secuscan.database import init_db, get_db +from secuscan import routes +from fastapi import HTTPException + + +@pytest.mark.asyncio +async def test_rotate_success(tmp_path, monkeypatch): + # configure keys + settings.vault_key_previous = "old-seed" + settings.vault_key = "new-seed" + + # init in-memory DB + db = await init_db(":memory:") + + # create a secret encrypted with the previous key + prev_crypto = VaultCrypto(settings.resolved_vault_key_previous, previous_keys=None, current_version=1) + blob = prev_crypto.encrypt("s3cr3t") + secret_id = str(uuid.uuid4()) + await db.execute("INSERT INTO credential_vault (id, name, encrypted_value, key_version) VALUES (?, ?, ?, ?)", (secret_id, "my-secret", blob, 1)) + + # perform rotation + resp = await routes.rotate_vault_keys() + assert resp["rotated"] == 1 + + # ensure record decrypts with new key + row = await db.fetchone("SELECT encrypted_value FROM credential_vault WHERE id = ?", (secret_id,)) + cur_crypto = VaultCrypto(settings.resolved_vault_key, previous_keys=[settings.resolved_vault_key_previous]) + assert cur_crypto.decrypt(row["encrypted_value"]) == "s3cr3t" + + +@pytest.mark.asyncio +async def test_rotate_missing_previous_key(monkeypatch): + # ensure previous key unset + settings.vault_key_previous = None + settings.vault_key = "new-seed" + + await init_db(":memory:") + + with pytest.raises(HTTPException) as exc: + await routes.rotate_vault_keys() + assert exc.value.status_code == 400 + + +@pytest.mark.asyncio +async def test_rotate_rollback_on_failure(tmp_path): + settings.vault_key_previous = "old-seed" + settings.vault_key = "new-seed" + + db = await init_db(":memory:") + + # valid record + prev_crypto = VaultCrypto(settings.resolved_vault_key_previous, previous_keys=None, current_version=1) + good = prev_crypto.encrypt("ok") + id_good = str(uuid.uuid4()) + await db.execute("INSERT INTO credential_vault (id, name, encrypted_value, key_version) VALUES (?, ?, ?, ?)", (id_good, "good", good, 1)) + + # invalid record (undecryptable) + bad_blob = base64.urlsafe_b64encode(b"\x01" + os.urandom(30)).decode('ascii') + id_bad = str(uuid.uuid4()) + await db.execute("INSERT INTO credential_vault (id, name, encrypted_value, key_version) VALUES (?, ?, ?, ?)", (id_bad, "bad", bad_blob, 1)) + + with pytest.raises(HTTPException) as exc: + await routes.rotate_vault_keys() + assert exc.value.status_code == 500 + + # Verify no changes were committed (bad record still present and good record decrypts with previous key) + row_bad = await db.fetchone("SELECT encrypted_value FROM credential_vault WHERE id = ?", (id_bad,)) + assert row_bad is not None + row_good = await db.fetchone("SELECT encrypted_value FROM credential_vault WHERE id = ?", (id_good,)) + assert prev_crypto.decrypt(row_good["encrypted_value"]) == "ok" \ No newline at end of file