Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""create encrypted_credentials table

Adds the encrypted_credentials table for Fernet-encrypted credential
storage in the Management bounded context.

Revision ID: d5e6f7a8b9c0
Revises: c4d5e6f7a8b9
Create Date: 2026-03-17
"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op


# revision identifiers, used by Alembic.
revision: str = "d5e6f7a8b9c0"
down_revision: Union[str, Sequence[str], None] = "c4d5e6f7a8b9"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Create encrypted_credentials table with composite PK (path, tenant_id)."""
op.create_table(
"encrypted_credentials",
sa.Column("path", sa.String(500), primary_key=True),
sa.Column("tenant_id", sa.String(26), primary_key=True),
sa.Column("encrypted_value", sa.LargeBinary, nullable=False),
sa.Column("key_version", sa.Integer, nullable=False, server_default="0"),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
)


def downgrade() -> None:
"""Drop encrypted_credentials table."""
op.drop_table("encrypted_credentials")
28 changes: 28 additions & 0 deletions src/api/infrastructure/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,31 @@ def get_oidc_settings() -> OIDCSettings:
Uses lru_cache to ensure settings are only loaded once.
"""
return OIDCSettings()


class ManagementSettings(BaseSettings):
"""Management bounded context settings.

Environment variables:
KARTOGRAPH_MGMT_ENCRYPTION_KEY: Comma-separated Fernet keys for MultiFernet
"""

model_config = SettingsConfigDict(
env_prefix="KARTOGRAPH_MGMT_",
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)

encryption_key: SecretStr = Field(
description="Comma-separated Fernet keys for MultiFernet",
)
Comment on lines +406 to +408
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Confirm no existing validator on ManagementSettings.encryption_key
rg -n "class ManagementSettings|encryption_key|field_validator|model_validator" src/api/infrastructure/settings.py -C3

# 2) Locate where encryption_key is consumed to verify failures are currently deferred
rg -n "get_management_settings\\(|encryption_key\\.get_secret_value\\(|MultiFernet|Fernet\\(" src -C3

Repository: openshift-hyperfleet/kartograph

Length of output: 5912


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check how get_management_settings() is used and how encryption_key flows to FernetSecretStore
rg -n "get_management_settings()" src -A5 -B2

# Check how FernetSecretStore is instantiated with encryption_key
rg -n "FernetSecretStore" src -A10 -B2

# Check the actual consumption of encryption_key value
rg -n "encryption_key.get_secret_value()" src -A3 -B2

Repository: openshift-hyperfleet/kartograph

Length of output: 16174


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find where FernetSecretStore is instantiated in actual application code (not tests)
rg -n "FernetSecretStore\(" src/api -B5 -A5 | grep -v "test"

# Also search for where encryption_key is split or parsed
rg -n "encryption_key" src/api/api -B3 -A3

# Look for dependency injection or factory setup
rg -n "def.*fernet\|def.*secret.*store" src/api/api -i -B2 -A5

Repository: openshift-hyperfleet/kartograph

Length of output: 146


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find FernetSecretStore instantiation in application code (not tests)
rg -n "FernetSecretStore(" src --type python -B3 -A3 | grep -v "test"

# Look for where encryption_key is used/parsed
rg -n "encryption_key" src --type python -B2 -A2 | grep -v "test" | head -40

# Check directory structure
find src -type d -name "*api*" -o -name "*app*" -o -name "*main*" | head -20

Repository: openshift-hyperfleet/kartograph

Length of output: 161


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find FernetSecretStore instantiation in application code (not tests)
rg -n "FernetSecretStore" src --type python -B3 -A3 | grep -v "test"

# Look for where encryption_key is split/parsed
rg -n "\.split\(" src/api/infrastructure/settings.py -B3 -A3

# Check for any factory or initialization code
rg -n "get_management_settings\(\)" src --type python -B2 -A10 | grep -v "test"

Repository: openshift-hyperfleet/kartograph

Length of output: 105


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find FernetSecretStore instantiation
rg "FernetSecretStore" src -B3 -A3 | grep -v test

# Check for split operations on encryption_key
rg "split" src/api/infrastructure/settings.py -B3 -A3

# Look at the full context around get_management_settings
rg "get_management_settings" src -B2 -A15 | head -60

# Check if there's parsing logic elsewhere
rg "encryption_key.*split\|split.*encryption_key" src -B2 -A2

Repository: openshift-hyperfleet/kartograph

Length of output: 1977


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Read the ManagementSettings class completely
sed -n '392,500p' src/api/infrastructure/settings.py

# Look for any dependencies or properties that use ManagementSettings
rg "ManagementSettings" src -B2 -A5 | grep -v test | head -80

# Search for async context or app setup
fd -e "py" . src/api | grep -E "(main|app|startup|factory)" | head -10

Repository: openshift-hyperfleet/kartograph

Length of output: 2379


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Search for where get_management_settings is actually used
rg "get_management_settings" src -B2 -A10

# Look for dependency injection patterns (Depends, provide, factory)
rg "Depends.*management\|provide.*fernet\|FernetSecretStore" src -B3 -A5 | grep -v test

# Check for any service initialization
rg "class.*Service\|def.*factory" src/api --max-count=20 -B2 -A3 | head -80

Repository: openshift-hyperfleet/kartograph

Length of output: 585


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Search for all imports and usages of ManagementSettings or settings module
rg "from.*settings import|import.*ManagementSettings|get_management_settings" src

# Look at the main entry point or app initialization
find src -name "main.py" -o -name "__main__.py" -o -name "app.py" | head -5

# Look at the complete structure to understand the app
find src -maxdepth 3 -type f -name "*.py" | grep -E "(main|app|__init__)" | head -20

Repository: openshift-hyperfleet/kartograph

Length of output: 4065


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check main.py to see the app structure
cat src/api/main.py

# Check if ManagementSettings or get_management_settings is used anywhere
rg "ManagementSettings\|get_management_settings" src -l

# Look at the management __init__ to see what's exported
head -50 src/api/management/__init__.py

Repository: openshift-hyperfleet/kartograph

Length of output: 10423


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Get all files that reference get_management_settings or ManagementSettings
rg "get_management_settings\|ManagementSettings" src -l

# Check the management __init__ to understand what's actually used
cat src/api/management/__init__.py

# Look for any dependencies that might use encryption_key
rg "encryption_key\|get_management_settings" src/api/management -B3 -A3

Repository: openshift-hyperfleet/kartograph

Length of output: 57


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check what's in management __init__.py
cat src/api/management/__init__.py 2>/dev/null || echo "File not found"

# Look at management directory structure
find src/api/management -type f -name "*.py" | head -20

# Check if management is actually integrated anywhere
rg "from management\|import.*management" src/api --exclude-dir=tests

# Look for any TODO or FIXME related to management or encryption
rg "TODO|FIXME|XXX" src/api/infrastructure/settings.py -B2 -A2

# Check test files to understand if this feature is being tested
find src/api/tests -name "*management*" -type f

Repository: openshift-hyperfleet/kartograph

Length of output: 1685


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Search for ManagementSettings or encryption_key usage in management module
rg "ManagementSettings\|encryption_key" src/api/management -B3 -A3

# Check where credentials are actually being stored/retrieved
rg "store.*credential\|retrieve.*credential" src/api/management -B2 -A5

# Look at the FernetSecretStore constructor to see what it expects
sed -n '32,55p' src/api/management/infrastructure/repositories/fernet_secret_store.py

# Check if there's any dependency setup for FernetSecretStore
rg "FernetSecretStore\|@provides\|def.*secret" src/api/management/infrastructure -B2 -A5

Repository: openshift-hyperfleet/kartograph

Length of output: 57


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Just list what's in the management infrastructure directory
ls -la src/api/management/infrastructure/

# Look at the full FernetSecretStore class
head -100 src/api/management/infrastructure/repositories/fernet_secret_store.py

# Check if there's a dependencies file
find src/api/management -name "*depend*" -o -name "*factory*"

# Search for where encryption_keys is constructed (the list)
rg "encryption_keys.*=" src -A2 | head -30

Repository: openshift-hyperfleet/kartograph

Length of output: 4559


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Search for any test that actually uses ManagementSettings with encryption_key
rg "ManagementSettings" src/api/tests -B5 -A10 | head -50

# Look for any initialization or factory pattern for management bounded context
rg "class.*Factory\|def.*bootstrap\|def.*initialize" src/api/management -B2 -A8 | head -60

# Check the management __init__ to see what's exported
cat src/api/management/__init__.py

# Search for comments about encryption_key or TODO for management
rg "encryption_key\|ENCRYPTION_KEY" src/api -B3 -A3 | grep -v test | head -40

Repository: openshift-hyperfleet/kartograph

Length of output: 57


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Look for tests on settings module itself
cat src/api/tests/unit/infrastructure/test_settings.py

# Check if ManagementSettings is mentioned anywhere
rg "ManagementSettings" src/api

# Look at what the test imports from settings
rg "from infrastructure.settings import" src/api/tests/unit/infrastructure/test_settings.py

Repository: openshift-hyperfleet/kartograph

Length of output: 6913


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Final verification - search for any actual usage of get_management_settings in the entire codebase
rg "get_management_settings" src

# Check if there's any code that parses encryption_key or creates FernetSecretStore in main code
rg "FernetSecretStore\(" src/api -v test

# Verify the complete picture - what imports ManagementSettings
rg "ManagementSettings" src/api

Repository: openshift-hyperfleet/kartograph

Length of output: 50451


Add validation to encryption_key field to fail fast on misconfiguration.

The encryption_key field accepts any string without validation. Empty strings, extra commas, or malformed Fernet keys will only fail later when cryptographic operations are attempted. Validate the field at settings load time to surface configuration errors at startup.

Proposed fix
-from pydantic import Field, SecretStr, model_validator
+from pydantic import Field, SecretStr, field_validator, model_validator
+from cryptography.fernet import Fernet
...
 class ManagementSettings(BaseSettings):
@@
     encryption_key: SecretStr = Field(
         description="Comma-separated Fernet keys for MultiFernet",
     )
+
+    `@field_validator`("encryption_key")
+    `@classmethod`
+    def validate_encryption_key(cls, value: SecretStr) -> SecretStr:
+        raw = value.get_secret_value().strip()
+        if not raw:
+            raise ValueError("KARTOGRAPH_MGMT_ENCRYPTION_KEY must not be empty")
+
+        keys = [k.strip() for k in raw.split(",")]
+        if any(not k for k in keys):
+            raise ValueError(
+                "KARTOGRAPH_MGMT_ENCRYPTION_KEY contains an empty key entry"
+            )
+
+        for key in keys:
+            Fernet(key.encode("utf-8"))  # raises on invalid Fernet key
+
+        return value
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
encryption_key: SecretStr = Field(
description="Comma-separated Fernet keys for MultiFernet",
)
encryption_key: SecretStr = Field(
description="Comma-separated Fernet keys for MultiFernet",
)
`@field_validator`("encryption_key")
`@classmethod`
def validate_encryption_key(cls, value: SecretStr) -> SecretStr:
raw = value.get_secret_value().strip()
if not raw:
raise ValueError("KARTOGRAPH_MGMT_ENCRYPTION_KEY must not be empty")
keys = [k.strip() for k in raw.split(",")]
if any(not k for k in keys):
raise ValueError(
"KARTOGRAPH_MGMT_ENCRYPTION_KEY contains an empty key entry"
)
for key in keys:
Fernet(key.encode("utf-8")) # raises on invalid Fernet key
return value
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/infrastructure/settings.py` around lines 406 - 408, Add a Pydantic
validator for the encryption_key Field that fails fast: implement a
`@validator`('encryption_key') method (e.g. validate_encryption_key) on the same
settings model that reads the SecretStr, splits on commas, strips whitespace,
rejects empty segments and then attempts to construct cryptography.fernet.Fernet
for each key to verify format; raise a ValueError with a clear message on any
empty/malformed key so misconfiguration surfaces at startup and return the
original SecretStr on success.



@lru_cache
def get_management_settings() -> ManagementSettings:
"""Get cached Management settings.

Uses lru_cache to ensure settings are only loaded once.
"""
return ManagementSettings() # type: ignore[call-arg]
4 changes: 4 additions & 0 deletions src/api/management/infrastructure/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@

from management.infrastructure.models.data_source import DataSourceModel
from management.infrastructure.models.data_source_sync_run import DataSourceSyncRunModel
from management.infrastructure.models.encrypted_credential import (
EncryptedCredentialModel,
)
from management.infrastructure.models.knowledge_graph import KnowledgeGraphModel

__all__ = [
"DataSourceModel",
"DataSourceSyncRunModel",
"EncryptedCredentialModel",
"KnowledgeGraphModel",
]
35 changes: 35 additions & 0 deletions src/api/management/infrastructure/models/encrypted_credential.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""SQLAlchemy ORM model for the encrypted_credentials table.

Stores Fernet-encrypted credentials in PostgreSQL, scoped by path
and tenant_id. Part of the Management bounded context.
"""

from __future__ import annotations

from sqlalchemy import Integer, LargeBinary, String
from sqlalchemy.orm import Mapped, mapped_column

from infrastructure.database.models import Base, TimestampMixin


class EncryptedCredentialModel(Base, TimestampMixin):
"""ORM model for encrypted_credentials table.

Stores encrypted credential blobs keyed by (path, tenant_id).
The key_version column tracks which Fernet key was used for
encryption to support key rotation.
"""

__tablename__ = "encrypted_credentials"

path: Mapped[str] = mapped_column(String(500), primary_key=True)
tenant_id: Mapped[str] = mapped_column(String(26), primary_key=True)
encrypted_value: Mapped[bytes] = mapped_column(LargeBinary, nullable=False)
key_version: Mapped[int] = mapped_column(Integer, nullable=False, default=0)

def __repr__(self) -> str:
"""Return string representation."""
return (
f"<EncryptedCredentialModel(path={self.path}, "
f"tenant_id={self.tenant_id}, key_version={self.key_version})>"
)
Comment on lines +30 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

__repr__ leaks sensitive identifiers (path, tenant_id).

Model repr values often end up in logs and tracebacks. Exposing credential path and tenant identifier here creates unnecessary metadata leakage risk.

Suggested hardening
 def __repr__(self) -> str:
     """Return string representation."""
-    return (
-        f"<EncryptedCredentialModel(path={self.path}, "
-        f"tenant_id={self.tenant_id}, key_version={self.key_version})>"
-    )
+    return f"<EncryptedCredentialModel(key_version={self.key_version})>"

As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __repr__(self) -> str:
"""Return string representation."""
return (
f"<EncryptedCredentialModel(path={self.path}, "
f"tenant_id={self.tenant_id}, key_version={self.key_version})>"
)
def __repr__(self) -> str:
"""Return string representation."""
return f"<EncryptedCredentialModel(key_version={self.key_version})>"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/infrastructure/models/encrypted_credential.py` around
lines 30 - 35, The __repr__ implementation on EncryptedCredentialModel currently
includes sensitive fields (path, tenant_id); update the
EncryptedCredentialModel.__repr__ to avoid leaking identifiers by removing or
redacting path and tenant_id (e.g., show "<redacted>" or omit them) and keep
only non-sensitive metadata such as key_version and the class name to aid
debugging; locate the __repr__ method and replace the formatted string so it
does not expose raw path or tenant_id while preserving minimal helpful info.

6 changes: 6 additions & 0 deletions src/api/management/infrastructure/observability/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@
KnowledgeGraphRepositoryProbe,
SyncRunRepositoryProbe,
)
from management.infrastructure.observability.secret_store_probe import (
DefaultSecretStoreProbe,
SecretStoreProbe,
)

__all__ = [
"DataSourceRepositoryProbe",
"DefaultDataSourceRepositoryProbe",
"DefaultKnowledgeGraphRepositoryProbe",
"DefaultSecretStoreProbe",
"DefaultSyncRunRepositoryProbe",
"KnowledgeGraphRepositoryProbe",
"SecretStoreProbe",
"SyncRunRepositoryProbe",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Domain probes for secret store operations.

Following Domain-Oriented Observability patterns, these probes capture
domain-significant events related to credential storage operations.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Protocol

import structlog

if TYPE_CHECKING:
from shared_kernel.observability_context import ObservationContext


class SecretStoreProbe(Protocol):
"""Domain probe for secret store operations."""

def credential_stored(self, path: str, tenant_id: str) -> None:
"""Record that credentials were successfully stored."""
...

def credential_retrieved(self, path: str, tenant_id: str) -> None:
"""Record that credentials were successfully retrieved."""
...

def credential_not_found(self, path: str, tenant_id: str) -> None:
"""Record that credentials were not found."""
...

def credential_deleted(self, path: str, tenant_id: str) -> None:
"""Record that credentials were successfully deleted."""
...

def with_context(self, context: ObservationContext) -> SecretStoreProbe:
"""Create a new probe with observation context bound."""
...


class DefaultSecretStoreProbe:
"""Default implementation of SecretStoreProbe using structlog."""

def __init__(
self,
logger: structlog.stdlib.BoundLogger | None = None,
context: ObservationContext | None = None,
) -> None:
self._logger = logger or structlog.get_logger()
self._context = context

def _get_context_kwargs(self) -> dict[str, Any]:
if self._context is None:
return {}
return self._context.as_dict()

def with_context(self, context: ObservationContext) -> DefaultSecretStoreProbe:
return DefaultSecretStoreProbe(logger=self._logger, context=context)

def credential_stored(self, path: str, tenant_id: str) -> None:
self._logger.info(
"credential_stored",
path=path,
tenant_id=tenant_id,
**self._get_context_kwargs(),
)

def credential_retrieved(self, path: str, tenant_id: str) -> None:
self._logger.debug(
"credential_retrieved",
path=path,
tenant_id=tenant_id,
**self._get_context_kwargs(),
)

def credential_not_found(self, path: str, tenant_id: str) -> None:
self._logger.debug(
"credential_not_found",
path=path,
tenant_id=tenant_id,
**self._get_context_kwargs(),
)

def credential_deleted(self, path: str, tenant_id: str) -> None:
self._logger.info(
"credential_deleted",
path=path,
tenant_id=tenant_id,
**self._get_context_kwargs(),
)
Comment on lines +60 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Probe logs currently expose sensitive identifiers (path, tenant_id).

Credential operation telemetry should avoid raw identifiers. Emitting these fields directly increases leakage risk in centralized logs.

Minimize logged sensitive metadata
     def credential_stored(self, path: str, tenant_id: str) -> None:
         self._logger.info(
             "credential_stored",
-            path=path,
-            tenant_id=tenant_id,
             **self._get_context_kwargs(),
         )
@@
     def credential_retrieved(self, path: str, tenant_id: str) -> None:
         self._logger.debug(
             "credential_retrieved",
-            path=path,
-            tenant_id=tenant_id,
             **self._get_context_kwargs(),
         )
@@
     def credential_not_found(self, path: str, tenant_id: str) -> None:
         self._logger.debug(
             "credential_not_found",
-            path=path,
-            tenant_id=tenant_id,
             **self._get_context_kwargs(),
         )
@@
     def credential_deleted(self, path: str, tenant_id: str) -> None:
         self._logger.info(
             "credential_deleted",
-            path=path,
-            tenant_id=tenant_id,
             **self._get_context_kwargs(),
         )

As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def credential_stored(self, path: str, tenant_id: str) -> None:
self._logger.info(
"credential_stored",
path=path,
tenant_id=tenant_id,
**self._get_context_kwargs(),
)
def credential_retrieved(self, path: str, tenant_id: str) -> None:
self._logger.debug(
"credential_retrieved",
path=path,
tenant_id=tenant_id,
**self._get_context_kwargs(),
)
def credential_not_found(self, path: str, tenant_id: str) -> None:
self._logger.debug(
"credential_not_found",
path=path,
tenant_id=tenant_id,
**self._get_context_kwargs(),
)
def credential_deleted(self, path: str, tenant_id: str) -> None:
self._logger.info(
"credential_deleted",
path=path,
tenant_id=tenant_id,
**self._get_context_kwargs(),
)
def credential_stored(self, path: str, tenant_id: str) -> None:
self._logger.info(
"credential_stored",
**self._get_context_kwargs(),
)
def credential_retrieved(self, path: str, tenant_id: str) -> None:
self._logger.debug(
"credential_retrieved",
**self._get_context_kwargs(),
)
def credential_not_found(self, path: str, tenant_id: str) -> None:
self._logger.debug(
"credential_not_found",
**self._get_context_kwargs(),
)
def credential_deleted(self, path: str, tenant_id: str) -> None:
self._logger.info(
"credential_deleted",
**self._get_context_kwargs(),
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/infrastructure/observability/secret_store_probe.py` around
lines 53 - 83, The probe methods credential_stored, credential_retrieved,
credential_not_found and credential_deleted currently log raw sensitive
identifiers (path, tenant_id); change them to log non-sensitive representations
by passing redacted or hashed versions instead (e.g. call a new helper
_redact_identifier(value) or _hash_identifier(value) and log the redacted value
or a short stable fingerprint), update each method to use that helper when
setting the path and tenant_id fields, and add the helper (e.g.
_redact_identifier or _hash_identifier) on the same class to ensure centralized,
consistent redaction before calling self._logger.* while still including
**self._get_context_kwargs().

4 changes: 4 additions & 0 deletions src/api/management/infrastructure/repositories/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
from management.infrastructure.repositories.data_source_sync_run_repository import (
DataSourceSyncRunRepository,
)
from management.infrastructure.repositories.fernet_secret_store import (
FernetSecretStore,
)
from management.infrastructure.repositories.knowledge_graph_repository import (
KnowledgeGraphRepository,
)

__all__ = [
"DataSourceRepository",
"DataSourceSyncRunRepository",
"FernetSecretStore",
"KnowledgeGraphRepository",
]
112 changes: 112 additions & 0 deletions src/api/management/infrastructure/repositories/fernet_secret_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""Fernet-encrypted secret store implementation.

Uses cryptography.fernet.MultiFernet to encrypt credentials at rest
in PostgreSQL, supporting key rotation via multiple Fernet keys.
"""

from __future__ import annotations

import json

from cryptography.fernet import Fernet, MultiFernet
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from management.infrastructure.models.encrypted_credential import (
EncryptedCredentialModel,
)
from management.infrastructure.observability.secret_store_probe import (
DefaultSecretStoreProbe,
SecretStoreProbe,
)


def _validate_inputs(path: str, tenant_id: str) -> None:
"""Validate that path and tenant_id are non-empty."""
if not path or not path.strip():
raise ValueError("path must not be empty or whitespace-only")
if not tenant_id or not tenant_id.strip():
raise ValueError("tenant_id must not be empty or whitespace-only")


class FernetSecretStore:
"""Fernet-encrypted credential storage backed by PostgreSQL.

Implements both ISecretStoreRepository (management port) and
ICredentialReader (shared kernel) for a single implementation
that satisfies both read-write and read-only consumers.

Uses MultiFernet to support key rotation: the first key in the
list is used for encryption, and all keys are tried for decryption.
"""

def __init__(
self,
session: AsyncSession,
encryption_keys: list[str],
probe: SecretStoreProbe | None = None,
) -> None:
self._session = session
self._multi_fernet = MultiFernet([Fernet(key) for key in encryption_keys])
self._probe: SecretStoreProbe = probe or DefaultSecretStoreProbe()

async def store(
self, path: str, tenant_id: str, credentials: dict[str, str]
) -> None:
"""Encrypt and persist credentials via upsert."""
_validate_inputs(path, tenant_id)

plaintext = json.dumps(credentials).encode("utf-8")
encrypted = self._multi_fernet.encrypt(plaintext)

model = EncryptedCredentialModel(
path=path,
tenant_id=tenant_id,
encrypted_value=encrypted,
key_version=0,
)
await self._session.merge(model)
await self._session.flush()
self._probe.credential_stored(path, tenant_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid sending raw path/tenant_id to observability events.

These values are forwarded as-is and DefaultSecretStoreProbe logs them directly, which leaks tenant-scoped identifiers into logs.

Proposed fix
+import hashlib
 import json
@@
+def _safe_identifier(value: str) -> str:
+    return hashlib.sha256(value.encode("utf-8")).hexdigest()[:12]
+
@@
-        self._probe.credential_stored(path, tenant_id)
+        self._probe.credential_stored(
+            _safe_identifier(path), _safe_identifier(tenant_id)
+        )
@@
-            self._probe.credential_not_found(path, tenant_id)
+            self._probe.credential_not_found(
+                _safe_identifier(path), _safe_identifier(tenant_id)
+            )
             raise KeyError("Credentials not found")
@@
-        self._probe.credential_retrieved(path, tenant_id)
+        self._probe.credential_retrieved(
+            _safe_identifier(path), _safe_identifier(tenant_id)
+        )
@@
-        self._probe.credential_deleted(path, tenant_id)
+        self._probe.credential_deleted(
+            _safe_identifier(path), _safe_identifier(tenant_id)
+        )

As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."

Also applies to: 88-89, 92-93, 111-111

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/infrastructure/repositories/fernet_secret_store.py` at
line 70, Calls to DefaultSecretStoreProbe (e.g., from FernetSecretStore methods
that call self._probe.credential_stored / credential_removed / secret_created /
secret_deleted) are forwarding raw tenant-scoped identifiers (path, tenant_id)
into observability events; instead obfuscate these values before passing them
(for example hash or redact them consistently) and update the probe invocation
sites to pass the obfuscated identifier(s). Locate the call sites in
FernetSecretStore (the lines calling self._probe.credential_stored,
credential_removed, secret_created, secret_deleted) and replace the raw
arguments with a sanitized representation, and if needed adjust
DefaultSecretStoreProbe method signatures to accept/expect the obfuscated
identifier so no raw tenant IDs are logged.


async def retrieve(self, path: str, tenant_id: str) -> dict[str, str]:
"""Decrypt and return stored credentials.

Raises:
KeyError: If no credentials exist at the given path for the tenant.
"""
_validate_inputs(path, tenant_id)

stmt = select(EncryptedCredentialModel).where(
EncryptedCredentialModel.path == path,
EncryptedCredentialModel.tenant_id == tenant_id,
)
result = await self._session.execute(stmt)
model = result.scalar_one_or_none()

if model is None:
self._probe.credential_not_found(path, tenant_id)
raise KeyError("Credentials not found")

decrypted = self._multi_fernet.decrypt(model.encrypted_value)
self._probe.credential_retrieved(path, tenant_id)
return json.loads(decrypted.decode("utf-8"))

async def delete(self, path: str, tenant_id: str) -> bool:
"""Remove stored credentials. Returns True if deleted."""
_validate_inputs(path, tenant_id)

stmt = select(EncryptedCredentialModel).where(
EncryptedCredentialModel.path == path,
EncryptedCredentialModel.tenant_id == tenant_id,
)
result = await self._session.execute(stmt)
model = result.scalar_one_or_none()

if model is None:
return False

await self._session.delete(model)
await self._session.flush()
self._probe.credential_deleted(path, tenant_id)
return True
Loading
Loading