feat(management): add Fernet encrypted credential storage (AIHCM-182)#295
feat(management): add Fernet encrypted credential storage (AIHCM-182)#295
Conversation
…l model, and migration Introduces the secret store port protocol for encrypted credential management, the ORM model for the encrypted_credentials table, and the Alembic migration to create the table with composite PK (path, tenant_id). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…nfig Adds ManagementSettings with KARTOGRAPH_MGMT_ENCRYPTION_KEY env var for comma-separated MultiFernet keys, following the existing IAMSettings pattern. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Implements FernetSecretStore using cryptography.fernet.MultiFernet for encrypted credential storage. Satisfies both ISecretStoreRepository and ICredentialReader protocols. Unit tests verify round-trip encryption, key rotation, protocol conformance, and error handling. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests round-trip store/retrieve, credential overwrite, delete behavior, tenant isolation, and KeyError for nonexistent paths. Requires a running PostgreSQL instance with the encrypted_credentials migration applied. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add flush() after store/delete for transactional consistency - Change encryption_key to SecretStr, remove empty default - Add input validation per ICredentialReader contract - Add SecretStoreProbe for Domain Oriented Observability - Move FernetSecretStore to repositories/ directory - Sanitize KeyError messages to prevent information disclosure - Add missing unit tests for delete(), validation, corrupted ciphertext - Add encrypted_credentials to clean_management_data fixture Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…raph into worktree-agent-acdf4215
WalkthroughAdds end-to-end encrypted credential management for the management bounded context: an Alembic migration and SQL table, a SQLAlchemy ORM model, ManagementSettings for encryption keys, a runtime-checkable ISecretStoreRepository protocol, a FernetSecretStore implementation (MultiFernet, AsyncSession-backed) with observability probes, and unit + integration tests covering storage, retrieval, deletion, rotation, and validation. Tenant isolation is enforced via a composite primary key (path, tenant_id). Sequence Diagram(s)sequenceDiagram
actor Client
participant Store as FernetSecretStore
participant Encrypt as MultiFernet
participant DB as PostgreSQL
participant Probe as SecretStoreProbe
Client->>Store: store(path, tenant_id, credentials)
Store->>Store: _validate_inputs(path, tenant_id)
Store->>Encrypt: encrypt(credentials_json)
Encrypt-->>Store: encrypted_value
Store->>DB: merge(EncryptedCredentialModel)
DB-->>Store: success
Store->>Probe: credential_stored(path, tenant_id)
Probe-->>Store: event logged
Store-->>Client: None
sequenceDiagram
actor Client
participant Store as FernetSecretStore
participant DB as PostgreSQL
participant Encrypt as MultiFernet
participant Probe as SecretStoreProbe
Client->>Store: retrieve(path, tenant_id)
Store->>Store: _validate_inputs(path, tenant_id)
Store->>DB: query(EncryptedCredentialModel)
alt Found
DB-->>Store: model with encrypted_value
Store->>Encrypt: decrypt(encrypted_value)
Encrypt-->>Store: plaintext_json
Store->>Store: json.loads(plaintext)
Store->>Probe: credential_retrieved(path, tenant_id)
Probe-->>Store: event logged
Store-->>Client: credentials dict
else Not Found
DB-->>Store: None
Store->>Probe: credential_not_found(path, tenant_id)
Probe-->>Store: event logged
Store-->>Client: KeyError
end
sequenceDiagram
actor Client
participant Store as FernetSecretStore
participant DB as PostgreSQL
participant Probe as SecretStoreProbe
Client->>Store: delete(path, tenant_id)
Store->>Store: _validate_inputs(path, tenant_id)
Store->>DB: query(EncryptedCredentialModel)
alt Found
DB-->>Store: model
Store->>DB: delete(model)
DB-->>Store: success
Store->>Probe: credential_deleted(path, tenant_id)
Probe-->>Store: event logged
Store-->>Client: True
else Not Found
DB-->>Store: None
Store-->>Client: False
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (1)
src/api/management/infrastructure/repositories/fernet_secret_store.py (1)
43-51: Validate encryption key configuration explicitly in the constructor.Failing early with a clear message for empty/blank key lists makes misconfiguration safer and easier to diagnose than relying on downstream constructor errors.
Proposed refactor
def __init__( self, session: AsyncSession, encryption_keys: list[str], probe: SecretStoreProbe | None = None, ) -> None: + if not encryption_keys or any(not key or not key.strip() for key in encryption_keys): + raise ValueError("encryption_keys must contain at least one non-empty key") + self._session = session self._multi_fernet = MultiFernet([Fernet(key) for key in encryption_keys]) self._probe: SecretStoreProbe = probe or DefaultSecretStoreProbe()As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/management/infrastructure/repositories/fernet_secret_store.py` around lines 43 - 51, The constructor (__init__) of the secret store should validate the encryption_keys parameter and raise a clear, early error if the list is empty or contains only blank/invalid strings instead of letting MultiFernet/Fernet fail later; update the __init__ in fernet_secret_store.py to check that encryption_keys is a non-empty list and that each key is a non-empty/whitespace string (or canonical base64-like key format if you want stricter checks), then raise a ValueError with a descriptive message (e.g., "encryption_keys must contain at least one valid key") before constructing self._multi_fernet = MultiFernet([Fernet(key) ...]); keep existing Probe handling (_probe / DefaultSecretStoreProbe) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/api/infrastructure/settings.py`:
- Around line 399-402: ManagementSettings.model_config is missing env_file and
env_file_encoding so env vars (e.g., KARTOGRAPH_MGMT_ENCRYPTION_KEY) aren’t
loaded from .env; update the SettingsConfigDict assigned to
ManagementSettings.model_config to include env_file=".env" and
env_file_encoding="utf-8" alongside the existing env_prefix and extra entries so
it matches the other settings classes (DatabaseSettings, CORSSettings,
SpiceDBSettings, OutboxWorkerSettings, IAMSettings, OIDCSettings).
In `@src/api/management/infrastructure/models/encrypted_credential.py`:
- Around line 30-35: The __repr__ implementation on EncryptedCredentialModel
currently includes sensitive fields (path, tenant_id); update the
EncryptedCredentialModel.__repr__ to avoid leaking identifiers by removing or
redacting path and tenant_id (e.g., show "<redacted>" or omit them) and keep
only non-sensitive metadata such as key_version and the class name to aid
debugging; locate the __repr__ method and replace the formatted string so it
does not expose raw path or tenant_id while preserving minimal helpful info.
In `@src/api/management/infrastructure/observability/secret_store_probe.py`:
- Around line 53-83: The probe methods credential_stored, credential_retrieved,
credential_not_found and credential_deleted currently log raw sensitive
identifiers (path, tenant_id); change them to log non-sensitive representations
by passing redacted or hashed versions instead (e.g. call a new helper
_redact_identifier(value) or _hash_identifier(value) and log the redacted value
or a short stable fingerprint), update each method to use that helper when
setting the path and tenant_id fields, and add the helper (e.g.
_redact_identifier or _hash_identifier) on the same class to ensure centralized,
consistent redaction before calling self._logger.* while still including
**self._get_context_kwargs().
In `@src/api/management/infrastructure/repositories/fernet_secret_store.py`:
- Line 70: Calls to DefaultSecretStoreProbe (e.g., from FernetSecretStore
methods that call self._probe.credential_stored / credential_removed /
secret_created / secret_deleted) are forwarding raw tenant-scoped identifiers
(path, tenant_id) into observability events; instead obfuscate these values
before passing them (for example hash or redact them consistently) and update
the probe invocation sites to pass the obfuscated identifier(s). Locate the call
sites in FernetSecretStore (the lines calling self._probe.credential_stored,
credential_removed, secret_created, secret_deleted) and replace the raw
arguments with a sanitized representation, and if needed adjust
DefaultSecretStoreProbe method signatures to accept/expect the obfuscated
identifier so no raw tenant IDs are logged.
In `@src/api/tests/integration/management/conftest.py`:
- Around line 98-100: The single try/transaction around the cleanup lets a
missing encrypted_credentials table abort the whole cleanup; modify the cleanup
in conftest.py so the DELETE FROM encrypted_credentials call (the await
async_session.execute(text("DELETE FROM encrypted_credentials")) statement) is
run inside its own try/except (or its own transaction) that catches the specific
DB error for a missing table (e.g., ProgrammingError/OperationalError) and
logs/ignores it, then proceeds to run the remaining cleanup statements for
outbox/data_source/graph; this ensures async_session continues with the rest of
the cleanup even if the encrypted_credentials table is absent.
In `@src/api/tests/integration/management/test_fernet_secret_store.py`:
- Around line 33-37: The current fixture swallows exceptions during cleanup (the
try/except around async_session.execute(text("DELETE FROM
encrypted_credentials")) with await async_session.rollback()), which masks
DB/schema failures; change the error handling so failures are not suppressed: if
an exception occurs, perform await async_session.rollback() and then re-raise
the exception (or omit the broad try/except entirely) so the test fails fast;
apply the same change to the other cleanup block (the similar try/except at
lines 41-45) and reference async_session.execute, await async_session.commit,
and await async_session.rollback when updating the logic.
---
Nitpick comments:
In `@src/api/management/infrastructure/repositories/fernet_secret_store.py`:
- Around line 43-51: The constructor (__init__) of the secret store should
validate the encryption_keys parameter and raise a clear, early error if the
list is empty or contains only blank/invalid strings instead of letting
MultiFernet/Fernet fail later; update the __init__ in fernet_secret_store.py to
check that encryption_keys is a non-empty list and that each key is a
non-empty/whitespace string (or canonical base64-like key format if you want
stricter checks), then raise a ValueError with a descriptive message (e.g.,
"encryption_keys must contain at least one valid key") before constructing
self._multi_fernet = MultiFernet([Fernet(key) ...]); keep existing Probe
handling (_probe / DefaultSecretStoreProbe) unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e2f893d8-527e-4ead-a2cc-d5e9a385bb27
⛔ Files ignored due to path filters (1)
src/api/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (12)
src/api/infrastructure/migrations/versions/d5e6f7a8b9c0_create_encrypted_credentials_table.pysrc/api/infrastructure/settings.pysrc/api/management/infrastructure/models/__init__.pysrc/api/management/infrastructure/models/encrypted_credential.pysrc/api/management/infrastructure/observability/__init__.pysrc/api/management/infrastructure/observability/secret_store_probe.pysrc/api/management/infrastructure/repositories/__init__.pysrc/api/management/infrastructure/repositories/fernet_secret_store.pysrc/api/management/ports/secret_store.pysrc/api/tests/integration/management/conftest.pysrc/api/tests/integration/management/test_fernet_secret_store.pysrc/api/tests/unit/management/infrastructure/test_fernet_secret_store.py
| def __repr__(self) -> str: | ||
| """Return string representation.""" | ||
| return ( | ||
| f"<EncryptedCredentialModel(path={self.path}, " | ||
| f"tenant_id={self.tenant_id}, key_version={self.key_version})>" | ||
| ) |
There was a problem hiding this comment.
__repr__ leaks sensitive identifiers (path, tenant_id).
Model repr values often end up in logs and tracebacks. Exposing credential path and tenant identifier here creates unnecessary metadata leakage risk.
Suggested hardening
def __repr__(self) -> str:
"""Return string representation."""
- return (
- f"<EncryptedCredentialModel(path={self.path}, "
- f"tenant_id={self.tenant_id}, key_version={self.key_version})>"
- )
+ return f"<EncryptedCredentialModel(key_version={self.key_version})>"As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def __repr__(self) -> str: | |
| """Return string representation.""" | |
| return ( | |
| f"<EncryptedCredentialModel(path={self.path}, " | |
| f"tenant_id={self.tenant_id}, key_version={self.key_version})>" | |
| ) | |
| def __repr__(self) -> str: | |
| """Return string representation.""" | |
| return f"<EncryptedCredentialModel(key_version={self.key_version})>" |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/management/infrastructure/models/encrypted_credential.py` around
lines 30 - 35, The __repr__ implementation on EncryptedCredentialModel currently
includes sensitive fields (path, tenant_id); update the
EncryptedCredentialModel.__repr__ to avoid leaking identifiers by removing or
redacting path and tenant_id (e.g., show "<redacted>" or omit them) and keep
only non-sensitive metadata such as key_version and the class name to aid
debugging; locate the __repr__ method and replace the formatted string so it
does not expose raw path or tenant_id while preserving minimal helpful info.
| def credential_stored(self, path: str, tenant_id: str) -> None: | ||
| self._logger.info( | ||
| "credential_stored", | ||
| path=path, | ||
| tenant_id=tenant_id, | ||
| **self._get_context_kwargs(), | ||
| ) | ||
|
|
||
| def credential_retrieved(self, path: str, tenant_id: str) -> None: | ||
| self._logger.debug( | ||
| "credential_retrieved", | ||
| path=path, | ||
| tenant_id=tenant_id, | ||
| **self._get_context_kwargs(), | ||
| ) | ||
|
|
||
| def credential_not_found(self, path: str, tenant_id: str) -> None: | ||
| self._logger.debug( | ||
| "credential_not_found", | ||
| path=path, | ||
| tenant_id=tenant_id, | ||
| **self._get_context_kwargs(), | ||
| ) | ||
|
|
||
| def credential_deleted(self, path: str, tenant_id: str) -> None: | ||
| self._logger.info( | ||
| "credential_deleted", | ||
| path=path, | ||
| tenant_id=tenant_id, | ||
| **self._get_context_kwargs(), | ||
| ) |
There was a problem hiding this comment.
Probe logs currently expose sensitive identifiers (path, tenant_id).
Credential operation telemetry should avoid raw identifiers. Emitting these fields directly increases leakage risk in centralized logs.
Minimize logged sensitive metadata
def credential_stored(self, path: str, tenant_id: str) -> None:
self._logger.info(
"credential_stored",
- path=path,
- tenant_id=tenant_id,
**self._get_context_kwargs(),
)
@@
def credential_retrieved(self, path: str, tenant_id: str) -> None:
self._logger.debug(
"credential_retrieved",
- path=path,
- tenant_id=tenant_id,
**self._get_context_kwargs(),
)
@@
def credential_not_found(self, path: str, tenant_id: str) -> None:
self._logger.debug(
"credential_not_found",
- path=path,
- tenant_id=tenant_id,
**self._get_context_kwargs(),
)
@@
def credential_deleted(self, path: str, tenant_id: str) -> None:
self._logger.info(
"credential_deleted",
- path=path,
- tenant_id=tenant_id,
**self._get_context_kwargs(),
)As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def credential_stored(self, path: str, tenant_id: str) -> None: | |
| self._logger.info( | |
| "credential_stored", | |
| path=path, | |
| tenant_id=tenant_id, | |
| **self._get_context_kwargs(), | |
| ) | |
| def credential_retrieved(self, path: str, tenant_id: str) -> None: | |
| self._logger.debug( | |
| "credential_retrieved", | |
| path=path, | |
| tenant_id=tenant_id, | |
| **self._get_context_kwargs(), | |
| ) | |
| def credential_not_found(self, path: str, tenant_id: str) -> None: | |
| self._logger.debug( | |
| "credential_not_found", | |
| path=path, | |
| tenant_id=tenant_id, | |
| **self._get_context_kwargs(), | |
| ) | |
| def credential_deleted(self, path: str, tenant_id: str) -> None: | |
| self._logger.info( | |
| "credential_deleted", | |
| path=path, | |
| tenant_id=tenant_id, | |
| **self._get_context_kwargs(), | |
| ) | |
| def credential_stored(self, path: str, tenant_id: str) -> None: | |
| self._logger.info( | |
| "credential_stored", | |
| **self._get_context_kwargs(), | |
| ) | |
| def credential_retrieved(self, path: str, tenant_id: str) -> None: | |
| self._logger.debug( | |
| "credential_retrieved", | |
| **self._get_context_kwargs(), | |
| ) | |
| def credential_not_found(self, path: str, tenant_id: str) -> None: | |
| self._logger.debug( | |
| "credential_not_found", | |
| **self._get_context_kwargs(), | |
| ) | |
| def credential_deleted(self, path: str, tenant_id: str) -> None: | |
| self._logger.info( | |
| "credential_deleted", | |
| **self._get_context_kwargs(), | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/management/infrastructure/observability/secret_store_probe.py` around
lines 53 - 83, The probe methods credential_stored, credential_retrieved,
credential_not_found and credential_deleted currently log raw sensitive
identifiers (path, tenant_id); change them to log non-sensitive representations
by passing redacted or hashed versions instead (e.g. call a new helper
_redact_identifier(value) or _hash_identifier(value) and log the redacted value
or a short stable fingerprint), update each method to use that helper when
setting the path and tenant_id fields, and add the helper (e.g.
_redact_identifier or _hash_identifier) on the same class to ensure centralized,
consistent redaction before calling self._logger.* while still including
**self._get_context_kwargs().
| ) | ||
| await self._session.merge(model) | ||
| await self._session.flush() | ||
| self._probe.credential_stored(path, tenant_id) |
There was a problem hiding this comment.
Avoid sending raw path/tenant_id to observability events.
These values are forwarded as-is and DefaultSecretStoreProbe logs them directly, which leaks tenant-scoped identifiers into logs.
Proposed fix
+import hashlib
import json
@@
+def _safe_identifier(value: str) -> str:
+ return hashlib.sha256(value.encode("utf-8")).hexdigest()[:12]
+
@@
- self._probe.credential_stored(path, tenant_id)
+ self._probe.credential_stored(
+ _safe_identifier(path), _safe_identifier(tenant_id)
+ )
@@
- self._probe.credential_not_found(path, tenant_id)
+ self._probe.credential_not_found(
+ _safe_identifier(path), _safe_identifier(tenant_id)
+ )
raise KeyError("Credentials not found")
@@
- self._probe.credential_retrieved(path, tenant_id)
+ self._probe.credential_retrieved(
+ _safe_identifier(path), _safe_identifier(tenant_id)
+ )
@@
- self._probe.credential_deleted(path, tenant_id)
+ self._probe.credential_deleted(
+ _safe_identifier(path), _safe_identifier(tenant_id)
+ )As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."
Also applies to: 88-89, 92-93, 111-111
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/management/infrastructure/repositories/fernet_secret_store.py` at
line 70, Calls to DefaultSecretStoreProbe (e.g., from FernetSecretStore methods
that call self._probe.credential_stored / credential_removed / secret_created /
secret_deleted) are forwarding raw tenant-scoped identifiers (path, tenant_id)
into observability events; instead obfuscate these values before passing them
(for example hash or redact them consistently) and update the probe invocation
sites to pass the obfuscated identifier(s). Locate the call sites in
FernetSecretStore (the lines calling self._probe.credential_stored,
credential_removed, secret_created, secret_deleted) and replace the raw
arguments with a sanitized representation, and if needed adjust
DefaultSecretStoreProbe method signatures to accept/expect the obfuscated
identifier so no raw tenant IDs are logged.
| # Clean encrypted credentials first | ||
| await async_session.execute(text("DELETE FROM encrypted_credentials")) | ||
| # Clean management-related outbox entries |
There was a problem hiding this comment.
New first delete can short-circuit all cleanup when the table is missing.
With the current single try transaction, a missing encrypted_credentials table causes rollback before outbox/data-source/graph cleanup runs, which can leak state across tests.
Safer cleanup for this table
- # Clean encrypted credentials first
- await async_session.execute(text("DELETE FROM encrypted_credentials"))
+ # Clean encrypted credentials first (only if table exists)
+ encrypted_table_exists = await async_session.scalar(
+ text("SELECT to_regclass('encrypted_credentials')")
+ )
+ if encrypted_table_exists is not None:
+ await async_session.execute(text("DELETE FROM encrypted_credentials"))As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Clean encrypted credentials first | |
| await async_session.execute(text("DELETE FROM encrypted_credentials")) | |
| # Clean management-related outbox entries | |
| # Clean encrypted credentials first (only if table exists) | |
| encrypted_table_exists = await async_session.scalar( | |
| text("SELECT to_regclass('encrypted_credentials')") | |
| ) | |
| if encrypted_table_exists is not None: | |
| await async_session.execute(text("DELETE FROM encrypted_credentials")) | |
| # Clean management-related outbox entries |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/tests/integration/management/conftest.py` around lines 98 - 100, The
single try/transaction around the cleanup lets a missing encrypted_credentials
table abort the whole cleanup; modify the cleanup in conftest.py so the DELETE
FROM encrypted_credentials call (the await async_session.execute(text("DELETE
FROM encrypted_credentials")) statement) is run inside its own try/except (or
its own transaction) that catches the specific DB error for a missing table
(e.g., ProgrammingError/OperationalError) and logs/ignores it, then proceeds to
run the remaining cleanup statements for outbox/data_source/graph; this ensures
async_session continues with the rest of the cleanup even if the
encrypted_credentials table is absent.
| try: | ||
| await async_session.execute(text("DELETE FROM encrypted_credentials")) | ||
| await async_session.commit() | ||
| except Exception: | ||
| await async_session.rollback() |
There was a problem hiding this comment.
Don’t swallow cleanup failures in the integration fixture.
If table cleanup fails, tests should fail fast; otherwise schema/DB issues get masked and results become unreliable.
Proposed fix
`@pytest_asyncio.fixture`
async def clean_encrypted_credentials(
async_session: AsyncSession,
) -> AsyncGenerator[None, None]:
"""Clean encrypted_credentials table before each test."""
try:
await async_session.execute(text("DELETE FROM encrypted_credentials"))
await async_session.commit()
except Exception:
await async_session.rollback()
+ raise
yield
try:
await async_session.execute(text("DELETE FROM encrypted_credentials"))
await async_session.commit()
except Exception:
await async_session.rollback()
+ raiseAs per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."
Also applies to: 41-45
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/tests/integration/management/test_fernet_secret_store.py` around
lines 33 - 37, The current fixture swallows exceptions during cleanup (the
try/except around async_session.execute(text("DELETE FROM
encrypted_credentials")) with await async_session.rollback()), which masks
DB/schema failures; change the error handling so failures are not suppressed: if
an exception occurs, perform await async_session.rollback() and then re-raise
the exception (or omit the broad try/except entirely) so the test fails fast;
apply the same change to the other cleanup block (the similar try/except at
lines 41-45) and reference async_session.execute, await async_session.commit,
and await async_session.rollback when updating the logic.
…to SecretStoreProbe - Add env_file=".env" and env_file_encoding="utf-8" to ManagementSettings matching all other Settings classes - Add with_context method to SecretStoreProbe Protocol and Default implementation matching the repository probe pattern Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/api/infrastructure/settings.py`:
- Around line 406-408: Add a Pydantic validator for the encryption_key Field
that fails fast: implement a `@validator`('encryption_key') method (e.g.
validate_encryption_key) on the same settings model that reads the SecretStr,
splits on commas, strips whitespace, rejects empty segments and then attempts to
construct cryptography.fernet.Fernet for each key to verify format; raise a
ValueError with a clear message on any empty/malformed key so misconfiguration
surfaces at startup and return the original SecretStr on success.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 1a525b82-212f-4cd2-95b0-0ab445883599
⛔ Files ignored due to path filters (1)
src/api/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (2)
src/api/infrastructure/settings.pysrc/api/management/infrastructure/observability/secret_store_probe.py
| encryption_key: SecretStr = Field( | ||
| description="Comma-separated Fernet keys for MultiFernet", | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Confirm no existing validator on ManagementSettings.encryption_key
rg -n "class ManagementSettings|encryption_key|field_validator|model_validator" src/api/infrastructure/settings.py -C3
# 2) Locate where encryption_key is consumed to verify failures are currently deferred
rg -n "get_management_settings\\(|encryption_key\\.get_secret_value\\(|MultiFernet|Fernet\\(" src -C3Repository: openshift-hyperfleet/kartograph
Length of output: 5912
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check how get_management_settings() is used and how encryption_key flows to FernetSecretStore
rg -n "get_management_settings()" src -A5 -B2
# Check how FernetSecretStore is instantiated with encryption_key
rg -n "FernetSecretStore" src -A10 -B2
# Check the actual consumption of encryption_key value
rg -n "encryption_key.get_secret_value()" src -A3 -B2Repository: openshift-hyperfleet/kartograph
Length of output: 16174
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find where FernetSecretStore is instantiated in actual application code (not tests)
rg -n "FernetSecretStore\(" src/api -B5 -A5 | grep -v "test"
# Also search for where encryption_key is split or parsed
rg -n "encryption_key" src/api/api -B3 -A3
# Look for dependency injection or factory setup
rg -n "def.*fernet\|def.*secret.*store" src/api/api -i -B2 -A5Repository: openshift-hyperfleet/kartograph
Length of output: 146
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find FernetSecretStore instantiation in application code (not tests)
rg -n "FernetSecretStore(" src --type python -B3 -A3 | grep -v "test"
# Look for where encryption_key is used/parsed
rg -n "encryption_key" src --type python -B2 -A2 | grep -v "test" | head -40
# Check directory structure
find src -type d -name "*api*" -o -name "*app*" -o -name "*main*" | head -20Repository: openshift-hyperfleet/kartograph
Length of output: 161
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find FernetSecretStore instantiation in application code (not tests)
rg -n "FernetSecretStore" src --type python -B3 -A3 | grep -v "test"
# Look for where encryption_key is split/parsed
rg -n "\.split\(" src/api/infrastructure/settings.py -B3 -A3
# Check for any factory or initialization code
rg -n "get_management_settings\(\)" src --type python -B2 -A10 | grep -v "test"Repository: openshift-hyperfleet/kartograph
Length of output: 105
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find FernetSecretStore instantiation
rg "FernetSecretStore" src -B3 -A3 | grep -v test
# Check for split operations on encryption_key
rg "split" src/api/infrastructure/settings.py -B3 -A3
# Look at the full context around get_management_settings
rg "get_management_settings" src -B2 -A15 | head -60
# Check if there's parsing logic elsewhere
rg "encryption_key.*split\|split.*encryption_key" src -B2 -A2Repository: openshift-hyperfleet/kartograph
Length of output: 1977
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Read the ManagementSettings class completely
sed -n '392,500p' src/api/infrastructure/settings.py
# Look for any dependencies or properties that use ManagementSettings
rg "ManagementSettings" src -B2 -A5 | grep -v test | head -80
# Search for async context or app setup
fd -e "py" . src/api | grep -E "(main|app|startup|factory)" | head -10Repository: openshift-hyperfleet/kartograph
Length of output: 2379
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Search for where get_management_settings is actually used
rg "get_management_settings" src -B2 -A10
# Look for dependency injection patterns (Depends, provide, factory)
rg "Depends.*management\|provide.*fernet\|FernetSecretStore" src -B3 -A5 | grep -v test
# Check for any service initialization
rg "class.*Service\|def.*factory" src/api --max-count=20 -B2 -A3 | head -80Repository: openshift-hyperfleet/kartograph
Length of output: 585
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Search for all imports and usages of ManagementSettings or settings module
rg "from.*settings import|import.*ManagementSettings|get_management_settings" src
# Look at the main entry point or app initialization
find src -name "main.py" -o -name "__main__.py" -o -name "app.py" | head -5
# Look at the complete structure to understand the app
find src -maxdepth 3 -type f -name "*.py" | grep -E "(main|app|__init__)" | head -20Repository: openshift-hyperfleet/kartograph
Length of output: 4065
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check main.py to see the app structure
cat src/api/main.py
# Check if ManagementSettings or get_management_settings is used anywhere
rg "ManagementSettings\|get_management_settings" src -l
# Look at the management __init__ to see what's exported
head -50 src/api/management/__init__.pyRepository: openshift-hyperfleet/kartograph
Length of output: 10423
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Get all files that reference get_management_settings or ManagementSettings
rg "get_management_settings\|ManagementSettings" src -l
# Check the management __init__ to understand what's actually used
cat src/api/management/__init__.py
# Look for any dependencies that might use encryption_key
rg "encryption_key\|get_management_settings" src/api/management -B3 -A3Repository: openshift-hyperfleet/kartograph
Length of output: 57
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check what's in management __init__.py
cat src/api/management/__init__.py 2>/dev/null || echo "File not found"
# Look at management directory structure
find src/api/management -type f -name "*.py" | head -20
# Check if management is actually integrated anywhere
rg "from management\|import.*management" src/api --exclude-dir=tests
# Look for any TODO or FIXME related to management or encryption
rg "TODO|FIXME|XXX" src/api/infrastructure/settings.py -B2 -A2
# Check test files to understand if this feature is being tested
find src/api/tests -name "*management*" -type fRepository: openshift-hyperfleet/kartograph
Length of output: 1685
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Search for ManagementSettings or encryption_key usage in management module
rg "ManagementSettings\|encryption_key" src/api/management -B3 -A3
# Check where credentials are actually being stored/retrieved
rg "store.*credential\|retrieve.*credential" src/api/management -B2 -A5
# Look at the FernetSecretStore constructor to see what it expects
sed -n '32,55p' src/api/management/infrastructure/repositories/fernet_secret_store.py
# Check if there's any dependency setup for FernetSecretStore
rg "FernetSecretStore\|@provides\|def.*secret" src/api/management/infrastructure -B2 -A5Repository: openshift-hyperfleet/kartograph
Length of output: 57
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Just list what's in the management infrastructure directory
ls -la src/api/management/infrastructure/
# Look at the full FernetSecretStore class
head -100 src/api/management/infrastructure/repositories/fernet_secret_store.py
# Check if there's a dependencies file
find src/api/management -name "*depend*" -o -name "*factory*"
# Search for where encryption_keys is constructed (the list)
rg "encryption_keys.*=" src -A2 | head -30Repository: openshift-hyperfleet/kartograph
Length of output: 4559
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Search for any test that actually uses ManagementSettings with encryption_key
rg "ManagementSettings" src/api/tests -B5 -A10 | head -50
# Look for any initialization or factory pattern for management bounded context
rg "class.*Factory\|def.*bootstrap\|def.*initialize" src/api/management -B2 -A8 | head -60
# Check the management __init__ to see what's exported
cat src/api/management/__init__.py
# Search for comments about encryption_key or TODO for management
rg "encryption_key\|ENCRYPTION_KEY" src/api -B3 -A3 | grep -v test | head -40Repository: openshift-hyperfleet/kartograph
Length of output: 57
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Look for tests on settings module itself
cat src/api/tests/unit/infrastructure/test_settings.py
# Check if ManagementSettings is mentioned anywhere
rg "ManagementSettings" src/api
# Look at what the test imports from settings
rg "from infrastructure.settings import" src/api/tests/unit/infrastructure/test_settings.pyRepository: openshift-hyperfleet/kartograph
Length of output: 6913
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Final verification - search for any actual usage of get_management_settings in the entire codebase
rg "get_management_settings" src
# Check if there's any code that parses encryption_key or creates FernetSecretStore in main code
rg "FernetSecretStore\(" src/api -v test
# Verify the complete picture - what imports ManagementSettings
rg "ManagementSettings" src/apiRepository: openshift-hyperfleet/kartograph
Length of output: 50451
Add validation to encryption_key field to fail fast on misconfiguration.
The encryption_key field accepts any string without validation. Empty strings, extra commas, or malformed Fernet keys will only fail later when cryptographic operations are attempted. Validate the field at settings load time to surface configuration errors at startup.
Proposed fix
-from pydantic import Field, SecretStr, model_validator
+from pydantic import Field, SecretStr, field_validator, model_validator
+from cryptography.fernet import Fernet
...
class ManagementSettings(BaseSettings):
@@
encryption_key: SecretStr = Field(
description="Comma-separated Fernet keys for MultiFernet",
)
+
+ `@field_validator`("encryption_key")
+ `@classmethod`
+ def validate_encryption_key(cls, value: SecretStr) -> SecretStr:
+ raw = value.get_secret_value().strip()
+ if not raw:
+ raise ValueError("KARTOGRAPH_MGMT_ENCRYPTION_KEY must not be empty")
+
+ keys = [k.strip() for k in raw.split(",")]
+ if any(not k for k in keys):
+ raise ValueError(
+ "KARTOGRAPH_MGMT_ENCRYPTION_KEY contains an empty key entry"
+ )
+
+ for key in keys:
+ Fernet(key.encode("utf-8")) # raises on invalid Fernet key
+
+ return value📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| encryption_key: SecretStr = Field( | |
| description="Comma-separated Fernet keys for MultiFernet", | |
| ) | |
| encryption_key: SecretStr = Field( | |
| description="Comma-separated Fernet keys for MultiFernet", | |
| ) | |
| `@field_validator`("encryption_key") | |
| `@classmethod` | |
| def validate_encryption_key(cls, value: SecretStr) -> SecretStr: | |
| raw = value.get_secret_value().strip() | |
| if not raw: | |
| raise ValueError("KARTOGRAPH_MGMT_ENCRYPTION_KEY must not be empty") | |
| keys = [k.strip() for k in raw.split(",")] | |
| if any(not k for k in keys): | |
| raise ValueError( | |
| "KARTOGRAPH_MGMT_ENCRYPTION_KEY contains an empty key entry" | |
| ) | |
| for key in keys: | |
| Fernet(key.encode("utf-8")) # raises on invalid Fernet key | |
| return value |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/infrastructure/settings.py` around lines 406 - 408, Add a Pydantic
validator for the encryption_key Field that fails fast: implement a
`@validator`('encryption_key') method (e.g. validate_encryption_key) on the same
settings model that reads the SecretStr, splits on commas, strips whitespace,
rejects empty segments and then attempts to construct cryptography.fernet.Fernet
for each key to verify format; raise a ValueError with a clear message on any
empty/malformed key so misconfiguration surfaces at startup and return the
original SecretStr on success.
Summary
FernetSecretStoreimplementing bothISecretStoreRepository(management write port) andICredentialReader(shared kernel read port)encrypted_credentialstable with composite PK(path, tenant_id)for tenant-scoped isolationManagementSettingswithSecretStrencryption key configurationSecretStoreProbefor Domain Oriented Observability on all credential operationsICredentialReadercontract (ValueErrorfor empty path/tenant_id)Test plan
🤖 Generated with Claude Code
Summary by CodeRabbit