Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from graph.infrastructure.age_client import AgeGraphClient
from graph.presentation import routes as graph_routes
from iam.presentation import router as iam_router
from management.presentation import management_router
from infrastructure.database.dependencies import (
close_database_engines,
init_database_engines,
Expand Down Expand Up @@ -216,6 +217,9 @@ async def kartograph_lifespan(app: FastAPI):
# Include IAM bounded context routes
app.include_router(iam_router)

# Include Management bounded context routes
app.include_router(management_router)

# Include dev utility routes (easy to remove for production)
app.include_router(dev_routes.router)

Expand Down
122 changes: 79 additions & 43 deletions src/api/management/application/services/data_source_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from datetime import UTC, datetime

from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from ulid import ULID

Expand All @@ -18,7 +19,7 @@
from management.domain.aggregates import DataSource
from management.domain.entities import DataSourceSyncRun
from management.domain.value_objects import DataSourceId, KnowledgeGraphId
from management.ports.exceptions import UnauthorizedError
from management.ports.exceptions import DuplicateDataSourceNameError, UnauthorizedError
from management.ports.repositories import (
IDataSourceRepository,
IDataSourceSyncRunRepository,
Expand Down Expand Up @@ -150,26 +151,36 @@ async def create(
if kg.tenant_id != self._scope_to_tenant:
raise ValueError(f"Knowledge graph {kg_id} belongs to different tenant")

async with self._session.begin():
ds = DataSource.create(
knowledge_graph_id=kg_id,
tenant_id=self._scope_to_tenant,
name=name,
adapter_type=adapter_type,
connection_config=connection_config,
created_by=user_id,
)

if raw_credentials is not None:
cred_path = f"datasource/{ds.id.value}/credentials"
await self._secret_store.store(
path=cred_path,
try:
async with self._session.begin():
ds = DataSource.create(
knowledge_graph_id=kg_id,
tenant_id=self._scope_to_tenant,
credentials=raw_credentials,
name=name,
adapter_type=adapter_type,
connection_config=connection_config,
created_by=user_id,
)
ds.credentials_path = cred_path

await self._ds_repo.save(ds)
if raw_credentials is not None:
cred_path = f"datasource/{ds.id.value}/credentials"
await self._secret_store.store(
path=cred_path,
tenant_id=self._scope_to_tenant,
credentials=raw_credentials,
)
ds.credentials_path = cred_path

await self._ds_repo.save(ds)
except IntegrityError as e:
Comment on lines +165 to +175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Avoid writing secrets before the database write is known to succeed.

At Line 167 and Line 339, _secret_store.store() runs before the database work has been flushed/committed. If save() or commit then fails, create leaves an orphaned secret and update can rotate credentials even though the request returns an error. It also keeps the transaction open across external I/O. Flush first, then write the secret, and add compensation if anything after the secret write fails.

Also applies to: 337-347

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/application/services/data_source_service.py` around lines
165 - 175, The secret is being stored via _secret_store.store(...) before the
database save/commit (_ds_repo.save(ds)) completes, which can create orphaned
secrets and prolong transactions; change the flow so you persist/flush/commit
the datasource first (call _ds_repo.save(ds) and ensure the transaction is
committed or flushed) and only then call _secret_store.store(...) and set
ds.credentials_path, minimizing time inside the DB transaction; additionally,
add compensation logic around the secret write (e.g., if any subsequent
operation after storing the secret fails, call _secret_store.delete(...) to
remove the orphaned secret) and ensure error handling around the try/except
block (including the IntegrityError handler) is updated to account for the new
ordering.

if "uq_data_sources_kg_name" in str(e):
self._probe.data_source_creation_failed(
kg_id=kg_id, name=name, error="duplicate name"
)
raise DuplicateDataSourceNameError(
f"Data source '{name}' already exists in knowledge graph '{kg_id}'"
) from e
raise

self._probe.data_source_created(
ds_id=ds.id.value,
Expand Down Expand Up @@ -219,15 +230,20 @@ async def list_for_knowledge_graph(
self,
user_id: str,
kg_id: str,
) -> list[DataSource]:
"""List data sources for a knowledge graph.
*,
offset: int = 0,
limit: int = 20,
) -> tuple[list[DataSource], int]:
"""List data sources for a knowledge graph with pagination.

Args:
user_id: The user requesting the list
kg_id: The knowledge graph to list DSes for
offset: Number of records to skip
limit: Maximum number of records to return

Returns:
List of DataSource aggregates
Tuple of (paginated DataSource aggregates, total count)

Raises:
UnauthorizedError: If user lacks VIEW permission on KG
Expand All @@ -254,14 +270,16 @@ async def list_for_knowledge_graph(
if kg is None or kg.tenant_id != self._scope_to_tenant:
raise UnauthorizedError(f"Knowledge graph {kg_id} not accessible")

data_sources = await self._ds_repo.find_by_knowledge_graph(kg_id)
data_sources, total = await self._ds_repo.find_by_knowledge_graph(
kg_id, offset=offset, limit=limit
)

self._probe.data_sources_listed(
kg_id=kg_id,
count=len(data_sources),
)

return data_sources
return data_sources, total

async def update(
self,
Expand Down Expand Up @@ -311,27 +329,45 @@ async def update(
if ds.tenant_id != self._scope_to_tenant:
raise ValueError(f"Data source {ds_id} not found")

async with self._session.begin():
if name is not None or connection_config is not None:
ds.update_connection(
name=name if name is not None else ds.name,
connection_config=connection_config
if connection_config is not None
else ds.connection_config,
credentials_path=ds.credentials_path,
updated_by=user_id,
)

if raw_credentials is not None:
cred_path = f"datasource/{ds.id.value}/credentials"
await self._secret_store.store(
path=cred_path,
tenant_id=self._scope_to_tenant,
credentials=raw_credentials,
try:
async with self._session.begin():
if name is not None or connection_config is not None:
ds.update_connection(
name=name if name is not None else ds.name,
connection_config=connection_config
if connection_config is not None
else ds.connection_config,
credentials_path=ds.credentials_path,
updated_by=user_id,
)

if raw_credentials is not None:
cred_path = f"datasource/{ds.id.value}/credentials"
await self._secret_store.store(
path=cred_path,
tenant_id=self._scope_to_tenant,
credentials=raw_credentials,
)
# Update via aggregate method to emit event and update timestamps
ds.update_connection(
name=ds.name,
connection_config=ds.connection_config,
credentials_path=cred_path,
updated_by=user_id,
)
Comment on lines +334 to +357
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

A combined PATCH can emit two DataSourceUpdated events.

When a request changes name/config and credentials together, this method calls ds.update_connection() once before the secret write and again after it. In src/api/management/domain/aggregates/data_source.py, DataSource.update_connection() appends a DataSourceUpdated event on every call, so one API request produces duplicate update events and timestamp churn. Compute the final values first and invoke the aggregate once.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/application/services/data_source_service.py` around lines
334 - 357, The code currently calls ds.update_connection() twice (before and
after storing secrets), causing duplicate DataSourceUpdated events; instead
compute the final name, connection_config, and credentials_path first, then: if
raw_credentials is provided, await self._secret_store.store(... ) to write
secrets and set credentials_path to f"datasource/{ds.id.value}/credentials",
then call ds.update_connection(...) exactly once with the final values (name,
connection_config, credentials_path, updated_by=user_id); if raw_credentials is
not provided but name/connection_config changed, call ds.update_connection(...)
once with the computed final values; reference ds.update_connection,
DataSource.update_connection, _secret_store.store, raw_credentials, and
credentials_path when making this change.


await self._ds_repo.save(ds)
except IntegrityError as e:
if "uq_data_sources_kg_name" in str(e):
self._probe.data_source_creation_failed(
kg_id=ds.knowledge_graph_id,
name=name or ds.name,
error="duplicate name",
)
ds.credentials_path = cred_path

await self._ds_repo.save(ds)
raise DuplicateDataSourceNameError(
f"Data source '{name or ds.name}' already exists in knowledge graph '{ds.knowledge_graph_id}'"
) from e
raise

if name is not None:
self._probe.data_source_updated(ds_id=ds_id, name=name)
Expand Down
58 changes: 41 additions & 17 deletions src/api/management/application/services/knowledge_graph_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,18 +200,24 @@ async def list_for_workspace(
self,
user_id: str,
workspace_id: str,
) -> list[KnowledgeGraph]:
"""List knowledge graphs in a workspace.
*,
offset: int = 0,
limit: int = 20,
) -> tuple[list[KnowledgeGraph], int]:
"""List knowledge graphs in a workspace with pagination.

Uses read_relationships to discover KG IDs linked to the workspace,
then fetches each from the repository and filters by tenant.
Pagination is applied after filtering.

Args:
user_id: The user requesting the list
workspace_id: The workspace to list KGs for
offset: Number of records to skip
limit: Maximum number of records to return

Returns:
List of KnowledgeGraph aggregates
Tuple of (paginated KnowledgeGraph aggregates, total count)

Raises:
UnauthorizedError: If user lacks VIEW permission on workspace
Expand Down Expand Up @@ -250,33 +256,37 @@ async def list_for_workspace(
kg_ids.append(parts[1])

# Fetch each KG from repo and filter by tenant
# (N+1 problem - acceptable for walking skeleton)
kgs: list[KnowledgeGraph] = []
for kg_id in kg_ids:
kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id))
if kg is not None and kg.tenant_id == self._scope_to_tenant:
kgs.append(kg)

total = len(kgs)
paginated = kgs[offset : offset + limit]

self._probe.knowledge_graphs_listed(
workspace_id=workspace_id,
count=len(kgs),
count=len(paginated),
)

return kgs
return paginated, total

async def update(
self,
user_id: str,
kg_id: str,
name: str,
description: str,
name: str | None = None,
description: str | None = None,
) -> KnowledgeGraph:
"""Update a knowledge graph's metadata.

Args:
user_id: The user performing the update
kg_id: The knowledge graph ID
name: New name
description: New description
name: Optional new name (uses existing if None)
description: Optional new description (uses existing if None)

Returns:
The updated KnowledgeGraph aggregate
Expand Down Expand Up @@ -310,17 +320,26 @@ async def update(
if kg.tenant_id != self._scope_to_tenant:
raise ValueError(f"Knowledge graph {kg_id} not found")

kg.update(name=name, description=description, updated_by=user_id)
resolved_name = name if name is not None else kg.name
resolved_description = (
description if description is not None else kg.description
)

kg.update(
name=resolved_name,
description=resolved_description,
updated_by=user_id,
)

try:
async with self._session.begin():
await self._kg_repo.save(kg)
except IntegrityError as e:
raise DuplicateKnowledgeGraphNameError(
f"Knowledge graph '{name}' already exists in tenant"
f"Knowledge graph '{resolved_name}' already exists in tenant"
) from e

self._probe.knowledge_graph_updated(kg_id=kg_id, name=name)
self._probe.knowledge_graph_updated(kg_id=kg_id, name=resolved_name)

return kg

Expand Down Expand Up @@ -366,12 +385,17 @@ async def delete(
return False

async with self._session.begin():
# Cascade delete data sources if repo is available
# Cascade delete all data sources before deleting KG
if self._ds_repo is not None:
data_sources = await self._ds_repo.find_by_knowledge_graph(kg_id)
for ds in data_sources:
ds.mark_for_deletion(deleted_by=user_id)
await self._ds_repo.delete(ds)
while True:
batch, _ = await self._ds_repo.find_by_knowledge_graph(
kg_id, offset=0, limit=100
)
if not batch:
break
for ds in batch:
ds.mark_for_deletion(deleted_by=user_id)
await self._ds_repo.delete(ds)

kg.mark_for_deletion(deleted_by=user_id)
await self._kg_repo.delete(kg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from typing import TYPE_CHECKING

from sqlalchemy import select
from sqlalchemy import func, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession

Expand Down Expand Up @@ -128,17 +128,31 @@ async def get_by_id(self, data_source_id: DataSourceId) -> DataSource | None:
return self._to_domain(model)

async def find_by_knowledge_graph(
self, knowledge_graph_id: str
) -> list[DataSource]:
stmt = select(DataSourceModel).where(
DataSourceModel.knowledge_graph_id == knowledge_graph_id
self, knowledge_graph_id: str, *, offset: int = 0, limit: int = 20
) -> tuple[list[DataSource], int]:
# Count query
count_stmt = (
select(func.count())
.select_from(DataSourceModel)
.where(DataSourceModel.knowledge_graph_id == knowledge_graph_id)
)
count_result = await self._session.execute(count_stmt)
total = count_result.scalar_one()

# Paginated query
stmt = (
select(DataSourceModel)
.where(DataSourceModel.knowledge_graph_id == knowledge_graph_id)
.offset(offset)
.limit(limit)
.order_by(DataSourceModel.created_at.desc())
)
result = await self._session.execute(stmt)
models = result.scalars().all()

data_sources = [self._to_domain(model) for model in models]
self._probe.data_sources_listed(knowledge_graph_id, len(data_sources))
return data_sources
return data_sources, total

async def delete(self, data_source: DataSource) -> bool:
stmt = select(DataSourceModel).where(DataSourceModel.id == data_source.id.value)
Expand Down
Loading
Loading