From 954a84ea0fafa5c71702ba007a001a2c7cc9d76b Mon Sep 17 00:00:00 2001 From: John Sell Date: Wed, 18 Mar 2026 10:27:26 -0400 Subject: [PATCH 1/9] fix(management): add IntegrityError handling to DataSourceService create/update The create() and update() methods in DataSourceService did not catch IntegrityError for duplicate data source names within a knowledge graph. This adds the same pattern used in KnowledgeGraphService.create() to raise DuplicateDataSourceNameError when the uq_data_sources_kg_name constraint is violated. Co-Authored-By: Claude Opus 4.6 --- .../services/data_source_service.py | 99 ++++++++++++------- 1 file changed, 61 insertions(+), 38 deletions(-) diff --git a/src/api/management/application/services/data_source_service.py b/src/api/management/application/services/data_source_service.py index 0533c1fc..bb8cf66f 100644 --- a/src/api/management/application/services/data_source_service.py +++ b/src/api/management/application/services/data_source_service.py @@ -8,6 +8,7 @@ from datetime import UTC, datetime +from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from ulid import ULID @@ -18,7 +19,7 @@ from management.domain.aggregates import DataSource from management.domain.entities import DataSourceSyncRun from management.domain.value_objects import DataSourceId, KnowledgeGraphId -from management.ports.exceptions import UnauthorizedError +from management.ports.exceptions import DuplicateDataSourceNameError, UnauthorizedError from management.ports.repositories import ( IDataSourceRepository, IDataSourceSyncRunRepository, @@ -150,26 +151,36 @@ async def create( if kg.tenant_id != self._scope_to_tenant: raise ValueError(f"Knowledge graph {kg_id} belongs to different tenant") - async with self._session.begin(): - ds = DataSource.create( - knowledge_graph_id=kg_id, - tenant_id=self._scope_to_tenant, - name=name, - adapter_type=adapter_type, - connection_config=connection_config, - created_by=user_id, - ) - - if raw_credentials is not None: - cred_path = f"datasource/{ds.id.value}/credentials" - await self._secret_store.store( - path=cred_path, + try: + async with self._session.begin(): + ds = DataSource.create( + knowledge_graph_id=kg_id, tenant_id=self._scope_to_tenant, - credentials=raw_credentials, + name=name, + adapter_type=adapter_type, + connection_config=connection_config, + created_by=user_id, ) - ds.credentials_path = cred_path - await self._ds_repo.save(ds) + if raw_credentials is not None: + cred_path = f"datasource/{ds.id.value}/credentials" + await self._secret_store.store( + path=cred_path, + tenant_id=self._scope_to_tenant, + credentials=raw_credentials, + ) + ds.credentials_path = cred_path + + await self._ds_repo.save(ds) + except IntegrityError as e: + if "uq_data_sources_kg_name" in str(e): + self._probe.data_source_creation_failed( + kg_id=kg_id, name=name, error="duplicate name" + ) + raise DuplicateDataSourceNameError( + f"Data source '{name}' already exists in knowledge graph '{kg_id}'" + ) from e + raise self._probe.data_source_created( ds_id=ds.id.value, @@ -311,27 +322,39 @@ async def update( if ds.tenant_id != self._scope_to_tenant: raise ValueError(f"Data source {ds_id} not found") - async with self._session.begin(): - if name is not None or connection_config is not None: - ds.update_connection( - name=name if name is not None else ds.name, - connection_config=connection_config - if connection_config is not None - else ds.connection_config, - credentials_path=ds.credentials_path, - updated_by=user_id, + try: + async with self._session.begin(): + if name is not None or connection_config is not None: + ds.update_connection( + name=name if name is not None else ds.name, + connection_config=connection_config + if connection_config is not None + else ds.connection_config, + credentials_path=ds.credentials_path, + updated_by=user_id, + ) + + if raw_credentials is not None: + cred_path = f"datasource/{ds.id.value}/credentials" + await self._secret_store.store( + path=cred_path, + tenant_id=self._scope_to_tenant, + credentials=raw_credentials, + ) + ds.credentials_path = cred_path + + await self._ds_repo.save(ds) + except IntegrityError as e: + if "uq_data_sources_kg_name" in str(e): + self._probe.data_source_creation_failed( + kg_id=ds.knowledge_graph_id, + name=name or ds.name, + error="duplicate name", ) - - if raw_credentials is not None: - cred_path = f"datasource/{ds.id.value}/credentials" - await self._secret_store.store( - path=cred_path, - tenant_id=self._scope_to_tenant, - credentials=raw_credentials, - ) - ds.credentials_path = cred_path - - await self._ds_repo.save(ds) + raise DuplicateDataSourceNameError( + f"Data source '{name or ds.name}' already exists in knowledge graph '{ds.knowledge_graph_id}'" + ) from e + raise if name is not None: self._probe.data_source_updated(ds_id=ds_id, name=name) From d296a87ad84cb3859e0ce1c50fdfdbc031064dbf Mon Sep 17 00:00:00 2001 From: John Sell Date: Wed, 18 Mar 2026 10:28:02 -0400 Subject: [PATCH 2/9] feat(management): support partial updates in KnowledgeGraphService.update() Change update() signature to accept optional name and description parameters. When None is passed, the existing values are preserved. This enables PATCH semantics in the upcoming route handlers. Co-Authored-By: Claude Opus 4.6 --- .../services/knowledge_graph_service.py | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/api/management/application/services/knowledge_graph_service.py b/src/api/management/application/services/knowledge_graph_service.py index ca187f04..f57b0f55 100644 --- a/src/api/management/application/services/knowledge_graph_service.py +++ b/src/api/management/application/services/knowledge_graph_service.py @@ -267,16 +267,16 @@ async def update( self, user_id: str, kg_id: str, - name: str, - description: str, + name: str | None = None, + description: str | None = None, ) -> KnowledgeGraph: """Update a knowledge graph's metadata. Args: user_id: The user performing the update kg_id: The knowledge graph ID - name: New name - description: New description + name: Optional new name (uses existing if None) + description: Optional new description (uses existing if None) Returns: The updated KnowledgeGraph aggregate @@ -310,17 +310,26 @@ async def update( if kg.tenant_id != self._scope_to_tenant: raise ValueError(f"Knowledge graph {kg_id} not found") - kg.update(name=name, description=description, updated_by=user_id) + resolved_name = name if name is not None else kg.name + resolved_description = ( + description if description is not None else kg.description + ) + + kg.update( + name=resolved_name, + description=resolved_description, + updated_by=user_id, + ) try: async with self._session.begin(): await self._kg_repo.save(kg) except IntegrityError as e: raise DuplicateKnowledgeGraphNameError( - f"Knowledge graph '{name}' already exists in tenant" + f"Knowledge graph '{resolved_name}' already exists in tenant" ) from e - self._probe.knowledge_graph_updated(kg_id=kg_id, name=name) + self._probe.knowledge_graph_updated(kg_id=kg_id, name=resolved_name) return kg From 5103aa6c1f4417bce4a05bf0eba8a9c048f1bffe Mon Sep 17 00:00:00 2001 From: John Sell Date: Wed, 18 Mar 2026 10:32:16 -0400 Subject: [PATCH 3/9] feat(management): add presentation layer with KG and DS routes (AIHCM-185) Add FastAPI routes for Knowledge Graph and Data Source CRUD operations: Knowledge Graph routes: - POST /management/workspaces/{workspace_id}/knowledge-graphs - GET /management/workspaces/{workspace_id}/knowledge-graphs (paginated) - GET /management/knowledge-graphs/{kg_id} - PATCH /management/knowledge-graphs/{kg_id} - DELETE /management/knowledge-graphs/{kg_id} Data Source routes: - POST /management/knowledge-graphs/{kg_id}/data-sources - GET /management/knowledge-graphs/{kg_id}/data-sources (paginated) - GET /management/data-sources/{ds_id} - PATCH /management/data-sources/{ds_id} - DELETE /management/data-sources/{ds_id} - POST /management/data-sources/{ds_id}/sync (202 Accepted) Includes Pydantic request/response models, DI wiring, router registration in main.py, and 41 unit tests covering happy paths, error handling (403/404/409/422), and pagination. Architecture test updated to exclude management.presentation from IAM import restriction (legitimate auth dependency at the boundary). Co-Authored-By: Claude Opus 4.6 --- src/api/main.py | 4 + src/api/management/presentation/__init__.py | 22 + .../presentation/data_sources/__init__.py | 1 + .../presentation/data_sources/models.py | 155 +++++++ .../presentation/data_sources/routes.py | 231 ++++++++++ .../presentation/knowledge_graphs/__init__.py | 1 + .../presentation/knowledge_graphs/models.py | 91 ++++ .../presentation/knowledge_graphs/routes.py | 191 ++++++++ .../unit/management/presentation/__init__.py | 0 .../presentation/test_data_source_routes.py | 434 ++++++++++++++++++ .../test_knowledge_graph_routes.py | 376 +++++++++++++++ .../unit/management/test_architecture.py | 10 +- 12 files changed, 1512 insertions(+), 4 deletions(-) create mode 100644 src/api/management/presentation/__init__.py create mode 100644 src/api/management/presentation/data_sources/__init__.py create mode 100644 src/api/management/presentation/data_sources/models.py create mode 100644 src/api/management/presentation/data_sources/routes.py create mode 100644 src/api/management/presentation/knowledge_graphs/__init__.py create mode 100644 src/api/management/presentation/knowledge_graphs/models.py create mode 100644 src/api/management/presentation/knowledge_graphs/routes.py create mode 100644 src/api/tests/unit/management/presentation/__init__.py create mode 100644 src/api/tests/unit/management/presentation/test_data_source_routes.py create mode 100644 src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py diff --git a/src/api/main.py b/src/api/main.py index 50153a8a..2404cd47 100644 --- a/src/api/main.py +++ b/src/api/main.py @@ -11,6 +11,7 @@ from graph.infrastructure.age_client import AgeGraphClient from graph.presentation import routes as graph_routes from iam.presentation import router as iam_router +from management.presentation import management_router from infrastructure.database.dependencies import ( close_database_engines, init_database_engines, @@ -216,6 +217,9 @@ async def kartograph_lifespan(app: FastAPI): # Include IAM bounded context routes app.include_router(iam_router) +# Include Management bounded context routes +app.include_router(management_router) + # Include dev utility routes (easy to remove for production) app.include_router(dev_routes.router) diff --git a/src/api/management/presentation/__init__.py b/src/api/management/presentation/__init__.py new file mode 100644 index 00000000..0135f72a --- /dev/null +++ b/src/api/management/presentation/__init__.py @@ -0,0 +1,22 @@ +"""Management presentation layer. + +Aggregates sub-routers for knowledge graphs and data sources, +exporting a single management_router for registration in main.py. +""" + +from __future__ import annotations + +from fastapi import APIRouter + +from management.presentation.data_sources.routes import router as ds_router +from management.presentation.knowledge_graphs.routes import router as kg_router + +management_router = APIRouter( + prefix="/management", + tags=["management"], +) + +management_router.include_router(kg_router) +management_router.include_router(ds_router) + +__all__ = ["management_router"] diff --git a/src/api/management/presentation/data_sources/__init__.py b/src/api/management/presentation/data_sources/__init__.py new file mode 100644 index 00000000..67042ba5 --- /dev/null +++ b/src/api/management/presentation/data_sources/__init__.py @@ -0,0 +1 @@ +"""Data source presentation sub-package.""" diff --git a/src/api/management/presentation/data_sources/models.py b/src/api/management/presentation/data_sources/models.py new file mode 100644 index 00000000..4d4c32dc --- /dev/null +++ b/src/api/management/presentation/data_sources/models.py @@ -0,0 +1,155 @@ +"""Request and response models for Data Source API endpoints.""" + +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, Field + +from management.domain.aggregates import DataSource +from management.domain.entities import DataSourceSyncRun + + +class CreateDataSourceRequest(BaseModel): + """Request to create a data source. + + Attributes: + name: Data source name (1-100 characters) + adapter_type: Adapter type string (validated against DataSourceAdapterType in route) + connection_config: Key-value connection configuration + credentials: Optional write-only credentials (never returned in responses) + """ + + name: str = Field(min_length=1, max_length=100) + adapter_type: str + connection_config: dict[str, str] + credentials: dict[str, str] | None = None + + +class UpdateDataSourceRequest(BaseModel): + """Request to partially update a data source. + + Attributes: + name: Optional new name (1-100 characters) + connection_config: Optional new connection configuration + credentials: Optional new credentials (write-only) + """ + + name: str | None = Field(default=None, min_length=1, max_length=100) + connection_config: dict[str, str] | None = None + credentials: dict[str, str] | None = None + + +class DataSourceResponse(BaseModel): + """Response containing data source details. + + Credentials are never returned. Instead, has_credentials indicates + whether credentials have been configured. + + Attributes: + id: Data source ID (ULID) + knowledge_graph_id: Parent knowledge graph ID + tenant_id: Tenant ID this data source belongs to + name: Data source name + adapter_type: Adapter type string + connection_config: Connection configuration key-value pairs + has_credentials: Whether credentials are configured + schedule_type: Schedule type (manual, cron, interval) + schedule_value: Schedule expression (None for manual) + last_sync_at: Last successful sync timestamp + created_at: Creation timestamp + updated_at: Last update timestamp + """ + + id: str + knowledge_graph_id: str + tenant_id: str + name: str + adapter_type: str + connection_config: dict[str, str] + has_credentials: bool + schedule_type: str + schedule_value: str | None + last_sync_at: datetime | None + created_at: datetime + updated_at: datetime + + @classmethod + def from_domain(cls, ds: DataSource) -> DataSourceResponse: + """Convert domain DataSource aggregate to API response. + + Args: + ds: DataSource domain aggregate + + Returns: + DataSourceResponse with data source details + """ + return cls( + id=ds.id.value, + knowledge_graph_id=ds.knowledge_graph_id, + tenant_id=ds.tenant_id, + name=ds.name, + adapter_type=ds.adapter_type.value, + connection_config=ds.connection_config, + has_credentials=ds.credentials_path is not None, + schedule_type=ds.schedule.schedule_type.value, + schedule_value=ds.schedule.value, + last_sync_at=ds.last_sync_at, + created_at=ds.created_at, + updated_at=ds.updated_at, + ) + + +class DataSourceListResponse(BaseModel): + """Response containing a paginated list of data sources. + + Attributes: + items: List of data source details + total: Total number of data sources (before pagination) + offset: Number of items skipped + limit: Maximum number of items returned + """ + + items: list[DataSourceResponse] + total: int + offset: int + limit: int + + +class SyncRunResponse(BaseModel): + """Response containing sync run details. + + Attributes: + id: Sync run ID + data_source_id: Data source this sync belongs to + status: Sync run status (pending, running, completed, failed) + started_at: Sync start timestamp + completed_at: Sync completion timestamp (None if not complete) + created_at: Record creation timestamp + """ + + id: str + data_source_id: str + status: str + started_at: datetime + completed_at: datetime | None + created_at: datetime + + @classmethod + def from_domain(cls, sync_run: DataSourceSyncRun) -> SyncRunResponse: + """Convert domain DataSourceSyncRun entity to API response. + + Args: + sync_run: DataSourceSyncRun domain entity + + Returns: + SyncRunResponse with sync run details + """ + return cls( + id=sync_run.id, + data_source_id=sync_run.data_source_id, + status=sync_run.status, + started_at=sync_run.started_at, + completed_at=sync_run.completed_at, + created_at=sync_run.created_at, + ) diff --git a/src/api/management/presentation/data_sources/routes.py b/src/api/management/presentation/data_sources/routes.py new file mode 100644 index 00000000..9327b55d --- /dev/null +++ b/src/api/management/presentation/data_sources/routes.py @@ -0,0 +1,231 @@ +"""Data source management routes.""" + +from __future__ import annotations + +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Query, status + +from iam.application.value_objects import CurrentUser +from iam.dependencies.user import get_current_user +from management.application.services.data_source_service import DataSourceService +from management.dependencies.data_source import get_data_source_service +from management.ports.exceptions import ( + DuplicateDataSourceNameError, + UnauthorizedError, +) +from management.presentation.data_sources.models import ( + CreateDataSourceRequest, + DataSourceListResponse, + DataSourceResponse, + SyncRunResponse, + UpdateDataSourceRequest, +) +from shared_kernel.datasource_types import DataSourceAdapterType + +router = APIRouter(tags=["Data Sources"]) + + +@router.post( + "/knowledge-graphs/{kg_id}/data-sources", + response_model=DataSourceResponse, + status_code=status.HTTP_201_CREATED, + summary="Create a data source", +) +async def create_data_source( + kg_id: str, + request: CreateDataSourceRequest, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[DataSourceService, Depends(get_data_source_service)], +) -> DataSourceResponse: + """Create a new data source in a knowledge graph.""" + try: + adapter_type = DataSourceAdapterType(request.adapter_type) + except ValueError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid adapter type: {request.adapter_type}", + ) + + try: + ds = await service.create( + user_id=current_user.user_id.value, + kg_id=kg_id, + name=request.name, + adapter_type=adapter_type, + connection_config=request.connection_config, + raw_credentials=request.credentials, + ) + return DataSourceResponse.from_domain(ds) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except DuplicateDataSourceNameError: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="A data source with this name already exists in this knowledge graph", + ) + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e), + ) + + +@router.get( + "/knowledge-graphs/{kg_id}/data-sources", + response_model=DataSourceListResponse, + summary="List data sources for a knowledge graph", +) +async def list_data_sources( + kg_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[DataSourceService, Depends(get_data_source_service)], + offset: int = Query(default=0, ge=0), + limit: int = Query(default=20, ge=1, le=100), +) -> DataSourceListResponse: + """List data sources for a knowledge graph with pagination.""" + try: + all_ds = await service.list_for_knowledge_graph( + user_id=current_user.user_id.value, + kg_id=kg_id, + ) + total = len(all_ds) + paginated = all_ds[offset : offset + limit] + return DataSourceListResponse( + items=[DataSourceResponse.from_domain(ds) for ds in paginated], + total=total, + offset=offset, + limit=limit, + ) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + + +@router.get( + "/data-sources/{ds_id}", + response_model=DataSourceResponse, + summary="Get a data source by ID", +) +async def get_data_source( + ds_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[DataSourceService, Depends(get_data_source_service)], +) -> DataSourceResponse: + """Get a data source by ID.""" + ds = await service.get(user_id=current_user.user_id.value, ds_id=ds_id) + if ds is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Data source not found", + ) + return DataSourceResponse.from_domain(ds) + + +@router.patch( + "/data-sources/{ds_id}", + response_model=DataSourceResponse, + summary="Update a data source", +) +async def update_data_source( + ds_id: str, + request: UpdateDataSourceRequest, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[DataSourceService, Depends(get_data_source_service)], +) -> DataSourceResponse: + """Update a data source's configuration.""" + try: + ds = await service.update( + user_id=current_user.user_id.value, + ds_id=ds_id, + name=request.name, + connection_config=request.connection_config, + raw_credentials=request.credentials, + ) + return DataSourceResponse.from_domain(ds) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except DuplicateDataSourceNameError: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="A data source with this name already exists in this knowledge graph", + ) + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e), + ) + + +@router.delete( + "/data-sources/{ds_id}", + status_code=status.HTTP_204_NO_CONTENT, + response_model=None, + summary="Delete a data source", +) +async def delete_data_source( + ds_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[DataSourceService, Depends(get_data_source_service)], +) -> None: + """Delete a data source.""" + try: + result = await service.delete( + user_id=current_user.user_id.value, + ds_id=ds_id, + ) + if not result: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Data source not found", + ) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to delete data source", + ) + + +@router.post( + "/data-sources/{ds_id}/sync", + response_model=SyncRunResponse, + status_code=status.HTTP_202_ACCEPTED, + summary="Trigger a data source sync", +) +async def trigger_sync( + ds_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[DataSourceService, Depends(get_data_source_service)], +) -> SyncRunResponse: + """Trigger a sync for a data source.""" + try: + sync_run = await service.trigger_sync( + user_id=current_user.user_id.value, + ds_id=ds_id, + ) + return SyncRunResponse.from_domain(sync_run) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=str(e), + ) diff --git a/src/api/management/presentation/knowledge_graphs/__init__.py b/src/api/management/presentation/knowledge_graphs/__init__.py new file mode 100644 index 00000000..2f20c9b1 --- /dev/null +++ b/src/api/management/presentation/knowledge_graphs/__init__.py @@ -0,0 +1 @@ +"""Knowledge graph presentation sub-package.""" diff --git a/src/api/management/presentation/knowledge_graphs/models.py b/src/api/management/presentation/knowledge_graphs/models.py new file mode 100644 index 00000000..0a898362 --- /dev/null +++ b/src/api/management/presentation/knowledge_graphs/models.py @@ -0,0 +1,91 @@ +"""Request and response models for Knowledge Graph API endpoints.""" + +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, Field + +from management.domain.aggregates import KnowledgeGraph + + +class CreateKnowledgeGraphRequest(BaseModel): + """Request to create a knowledge graph. + + Attributes: + name: Knowledge graph name (1-100 characters) + description: Optional description (defaults to empty string) + """ + + name: str = Field(min_length=1, max_length=100) + description: str = Field(default="") + + +class UpdateKnowledgeGraphRequest(BaseModel): + """Request to partially update a knowledge graph. + + Attributes: + name: Optional new name (1-100 characters) + description: Optional new description + """ + + name: str | None = Field(default=None, min_length=1, max_length=100) + description: str | None = Field(default=None) + + +class KnowledgeGraphResponse(BaseModel): + """Response containing knowledge graph details. + + Attributes: + id: Knowledge graph ID (ULID) + tenant_id: Tenant ID this knowledge graph belongs to + workspace_id: Workspace ID containing this knowledge graph + name: Knowledge graph name + description: Knowledge graph description + created_at: Creation timestamp + updated_at: Last update timestamp + """ + + id: str + tenant_id: str + workspace_id: str + name: str + description: str + created_at: datetime + updated_at: datetime + + @classmethod + def from_domain(cls, kg: KnowledgeGraph) -> KnowledgeGraphResponse: + """Convert domain KnowledgeGraph aggregate to API response. + + Args: + kg: KnowledgeGraph domain aggregate + + Returns: + KnowledgeGraphResponse with knowledge graph details + """ + return cls( + id=kg.id.value, + tenant_id=kg.tenant_id, + workspace_id=kg.workspace_id, + name=kg.name, + description=kg.description, + created_at=kg.created_at, + updated_at=kg.updated_at, + ) + + +class KnowledgeGraphListResponse(BaseModel): + """Response containing a paginated list of knowledge graphs. + + Attributes: + items: List of knowledge graph details + total: Total number of knowledge graphs (before pagination) + offset: Number of items skipped + limit: Maximum number of items returned + """ + + items: list[KnowledgeGraphResponse] + total: int + offset: int + limit: int diff --git a/src/api/management/presentation/knowledge_graphs/routes.py b/src/api/management/presentation/knowledge_graphs/routes.py new file mode 100644 index 00000000..1084bb0f --- /dev/null +++ b/src/api/management/presentation/knowledge_graphs/routes.py @@ -0,0 +1,191 @@ +"""Knowledge graph management routes.""" + +from __future__ import annotations + +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Query, status + +from iam.application.value_objects import CurrentUser +from iam.dependencies.user import get_current_user +from management.application.services.knowledge_graph_service import ( + KnowledgeGraphService, +) +from management.dependencies.knowledge_graph import get_knowledge_graph_service +from management.domain.exceptions import InvalidKnowledgeGraphNameError +from management.ports.exceptions import ( + DuplicateKnowledgeGraphNameError, + UnauthorizedError, +) +from management.presentation.knowledge_graphs.models import ( + CreateKnowledgeGraphRequest, + KnowledgeGraphListResponse, + KnowledgeGraphResponse, + UpdateKnowledgeGraphRequest, +) + +router = APIRouter(tags=["Knowledge Graphs"]) + + +@router.post( + "/workspaces/{workspace_id}/knowledge-graphs", + response_model=KnowledgeGraphResponse, + status_code=status.HTTP_201_CREATED, + summary="Create a knowledge graph", +) +async def create_knowledge_graph( + workspace_id: str, + request: CreateKnowledgeGraphRequest, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[KnowledgeGraphService, Depends(get_knowledge_graph_service)], +) -> KnowledgeGraphResponse: + """Create a new knowledge graph in a workspace.""" + try: + kg = await service.create( + user_id=current_user.user_id.value, + workspace_id=workspace_id, + name=request.name, + description=request.description, + ) + return KnowledgeGraphResponse.from_domain(kg) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except DuplicateKnowledgeGraphNameError: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="A knowledge graph with this name already exists", + ) + except (InvalidKnowledgeGraphNameError, ValueError) as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e), + ) + + +@router.get( + "/workspaces/{workspace_id}/knowledge-graphs", + response_model=KnowledgeGraphListResponse, + summary="List knowledge graphs in a workspace", +) +async def list_knowledge_graphs( + workspace_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[KnowledgeGraphService, Depends(get_knowledge_graph_service)], + offset: int = Query(default=0, ge=0), + limit: int = Query(default=20, ge=1, le=100), +) -> KnowledgeGraphListResponse: + """List knowledge graphs in a workspace with pagination.""" + try: + all_kgs = await service.list_for_workspace( + user_id=current_user.user_id.value, + workspace_id=workspace_id, + ) + total = len(all_kgs) + paginated = all_kgs[offset : offset + limit] + return KnowledgeGraphListResponse( + items=[KnowledgeGraphResponse.from_domain(kg) for kg in paginated], + total=total, + offset=offset, + limit=limit, + ) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + + +@router.get( + "/knowledge-graphs/{kg_id}", + response_model=KnowledgeGraphResponse, + summary="Get a knowledge graph by ID", +) +async def get_knowledge_graph( + kg_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[KnowledgeGraphService, Depends(get_knowledge_graph_service)], +) -> KnowledgeGraphResponse: + """Get a knowledge graph by ID.""" + kg = await service.get(user_id=current_user.user_id.value, kg_id=kg_id) + if kg is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Knowledge graph not found", + ) + return KnowledgeGraphResponse.from_domain(kg) + + +@router.patch( + "/knowledge-graphs/{kg_id}", + response_model=KnowledgeGraphResponse, + summary="Update a knowledge graph", +) +async def update_knowledge_graph( + kg_id: str, + request: UpdateKnowledgeGraphRequest, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[KnowledgeGraphService, Depends(get_knowledge_graph_service)], +) -> KnowledgeGraphResponse: + """Update a knowledge graph's metadata.""" + try: + kg = await service.update( + user_id=current_user.user_id.value, + kg_id=kg_id, + name=request.name, + description=request.description, + ) + return KnowledgeGraphResponse.from_domain(kg) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except DuplicateKnowledgeGraphNameError: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="A knowledge graph with this name already exists", + ) + except (InvalidKnowledgeGraphNameError, ValueError) as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e), + ) + + +@router.delete( + "/knowledge-graphs/{kg_id}", + status_code=status.HTTP_204_NO_CONTENT, + response_model=None, + summary="Delete a knowledge graph", +) +async def delete_knowledge_graph( + kg_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[KnowledgeGraphService, Depends(get_knowledge_graph_service)], +) -> None: + """Delete a knowledge graph.""" + try: + result = await service.delete( + user_id=current_user.user_id.value, + kg_id=kg_id, + ) + if not result: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Knowledge graph not found", + ) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to delete knowledge graph", + ) diff --git a/src/api/tests/unit/management/presentation/__init__.py b/src/api/tests/unit/management/presentation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/api/tests/unit/management/presentation/test_data_source_routes.py b/src/api/tests/unit/management/presentation/test_data_source_routes.py new file mode 100644 index 00000000..901cad7f --- /dev/null +++ b/src/api/tests/unit/management/presentation/test_data_source_routes.py @@ -0,0 +1,434 @@ +"""Unit tests for Data Source route handlers. + +Tests route-level behavior including status codes, response shapes, +error handling, and pagination. Service dependencies are mocked. +""" + +from __future__ import annotations + +from datetime import UTC, datetime +from unittest.mock import AsyncMock + +import pytest +import pytest_asyncio +from httpx import ASGITransport, AsyncClient + +from iam.application.value_objects import CurrentUser +from iam.domain.value_objects import TenantId, UserId +from management.domain.aggregates import DataSource +from management.domain.entities import DataSourceSyncRun +from management.domain.value_objects import DataSourceId, Schedule, ScheduleType +from management.ports.exceptions import ( + DuplicateDataSourceNameError, + UnauthorizedError, +) +from shared_kernel.datasource_types import DataSourceAdapterType + +# Fixed test data +TENANT_ID = "test-tenant-id" +KG_ID = "01JTEST00000000000000KG001" +DS_ID = "01JTEST00000000000000DS001" +USER_ID = "test-user-id" +NOW = datetime(2025, 1, 1, tzinfo=UTC) + + +def _make_current_user() -> CurrentUser: + """Create a CurrentUser for dependency override.""" + return CurrentUser( + user_id=UserId(value=USER_ID), + username="testuser", + tenant_id=TenantId(value=TENANT_ID), + ) + + +def _make_ds( + ds_id: str = DS_ID, + name: str = "Test DS", + credentials_path: str | None = None, +) -> DataSource: + """Create a DataSource aggregate for testing.""" + return DataSource( + id=DataSourceId(value=ds_id), + knowledge_graph_id=KG_ID, + tenant_id=TENANT_ID, + name=name, + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={"owner": "test", "repo": "test-repo"}, + credentials_path=credentials_path, + schedule=Schedule(schedule_type=ScheduleType.MANUAL), + last_sync_at=None, + created_at=NOW, + updated_at=NOW, + ) + + +def _make_sync_run(ds_id: str = DS_ID) -> DataSourceSyncRun: + """Create a DataSourceSyncRun entity for testing.""" + return DataSourceSyncRun( + id="01JTEST0000000000000SYNC01", + data_source_id=ds_id, + status="pending", + started_at=NOW, + completed_at=None, + error=None, + created_at=NOW, + ) + + +@pytest_asyncio.fixture +async def mock_service(): + """Create a mock DataSourceService.""" + return AsyncMock() + + +@pytest_asyncio.fixture +async def client(mock_service): + """Create an async HTTP client with mocked dependencies.""" + from main import app + + from iam.dependencies.user import get_current_user + from management.dependencies.data_source import get_data_source_service + + app.dependency_overrides[get_current_user] = lambda: _make_current_user() + app.dependency_overrides[get_data_source_service] = lambda: mock_service + + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as c: + yield c + + app.dependency_overrides.clear() + + +class TestCreateDataSource: + """Tests for POST /management/knowledge-graphs/{kg_id}/data-sources.""" + + @pytest.mark.asyncio + async def test_creates_successfully(self, client, mock_service): + """Test successful creation returns 201 with correct response shape.""" + ds = _make_ds() + mock_service.create.return_value = ds + + resp = await client.post( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + json={ + "name": "Test DS", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + }, + ) + + assert resp.status_code == 201 + data = resp.json() + assert data["id"] == DS_ID + assert data["knowledge_graph_id"] == KG_ID + assert data["tenant_id"] == TENANT_ID + assert data["name"] == "Test DS" + assert data["adapter_type"] == "github" + assert data["connection_config"] == {"owner": "test", "repo": "test-repo"} + assert data["has_credentials"] is False + assert data["schedule_type"] == "manual" + assert data["schedule_value"] is None + assert data["last_sync_at"] is None + + @pytest.mark.asyncio + async def test_creates_with_credentials(self, client, mock_service): + """Test creation with credentials sets has_credentials=true.""" + ds = _make_ds(credentials_path="datasource/ds1/credentials") + mock_service.create.return_value = ds + + resp = await client.post( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + json={ + "name": "Test DS", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + "credentials": {"token": "secret"}, + }, + ) + + assert resp.status_code == 201 + data = resp.json() + assert data["has_credentials"] is True + # Credentials should NOT be in response + assert "credentials" not in data + assert "credentials_path" not in data + + mock_service.create.assert_awaited_once_with( + user_id=USER_ID, + kg_id=KG_ID, + name="Test DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={"owner": "test", "repo": "test-repo"}, + raw_credentials={"token": "secret"}, + ) + + @pytest.mark.asyncio + async def test_invalid_adapter_type_returns_400(self, client, mock_service): + """Test that invalid adapter type returns 400.""" + resp = await client.post( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + json={ + "name": "Test DS", + "adapter_type": "invalid_type", + "connection_config": {}, + }, + ) + + assert resp.status_code == 400 + assert "adapter type" in resp.json()["detail"].lower() + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.create.side_effect = UnauthorizedError("denied") + + resp = await client.post( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + json={ + "name": "Test DS", + "adapter_type": "github", + "connection_config": {}, + }, + ) + + assert resp.status_code == 403 + + @pytest.mark.asyncio + async def test_duplicate_name_returns_409(self, client, mock_service): + """Test that DuplicateDataSourceNameError maps to 409.""" + mock_service.create.side_effect = DuplicateDataSourceNameError("dup") + + resp = await client.post( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + json={ + "name": "Test DS", + "adapter_type": "github", + "connection_config": {}, + }, + ) + + assert resp.status_code == 409 + + @pytest.mark.asyncio + async def test_empty_name_returns_422(self, client, mock_service): + """Test that Pydantic validation rejects empty name.""" + resp = await client.post( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + json={ + "name": "", + "adapter_type": "github", + "connection_config": {}, + }, + ) + + assert resp.status_code == 422 + + +class TestListDataSources: + """Tests for GET /management/knowledge-graphs/{kg_id}/data-sources.""" + + @pytest.mark.asyncio + async def test_lists_successfully(self, client, mock_service): + """Test successful list returns 200 with correct pagination.""" + data_sources = [ + _make_ds(ds_id=f"01JTEST00000000000000DS00{i}") for i in range(3) + ] + mock_service.list_for_knowledge_graph.return_value = data_sources + + resp = await client.get( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 3 + assert len(data["items"]) == 3 + assert data["offset"] == 0 + assert data["limit"] == 20 + + @pytest.mark.asyncio + async def test_pagination_offset_limit(self, client, mock_service): + """Test that offset and limit query params work correctly.""" + data_sources = [ + _make_ds(ds_id=f"01JTEST00000000000000DS00{i}") for i in range(5) + ] + mock_service.list_for_knowledge_graph.return_value = data_sources + + resp = await client.get( + f"/management/knowledge-graphs/{KG_ID}/data-sources?offset=2&limit=2", + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 5 + assert len(data["items"]) == 2 + assert data["offset"] == 2 + assert data["limit"] == 2 + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.list_for_knowledge_graph.side_effect = UnauthorizedError("denied") + + resp = await client.get( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + ) + + assert resp.status_code == 403 + + +class TestGetDataSource: + """Tests for GET /management/data-sources/{ds_id}.""" + + @pytest.mark.asyncio + async def test_gets_successfully(self, client, mock_service): + """Test successful get returns 200 with correct response.""" + ds = _make_ds() + mock_service.get.return_value = ds + + resp = await client.get(f"/management/data-sources/{DS_ID}") + + assert resp.status_code == 200 + data = resp.json() + assert data["id"] == DS_ID + assert data["name"] == "Test DS" + + @pytest.mark.asyncio + async def test_not_found_returns_404(self, client, mock_service): + """Test that None from service maps to 404.""" + mock_service.get.return_value = None + + resp = await client.get(f"/management/data-sources/{DS_ID}") + + assert resp.status_code == 404 + + +class TestUpdateDataSource: + """Tests for PATCH /management/data-sources/{ds_id}.""" + + @pytest.mark.asyncio + async def test_updates_successfully(self, client, mock_service): + """Test successful update returns 200.""" + ds = _make_ds(name="Updated DS") + mock_service.update.return_value = ds + + resp = await client.patch( + f"/management/data-sources/{DS_ID}", + json={"name": "Updated DS"}, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["name"] == "Updated DS" + + mock_service.update.assert_awaited_once_with( + user_id=USER_ID, + ds_id=DS_ID, + name="Updated DS", + connection_config=None, + raw_credentials=None, + ) + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.update.side_effect = UnauthorizedError("denied") + + resp = await client.patch( + f"/management/data-sources/{DS_ID}", + json={"name": "New Name"}, + ) + + assert resp.status_code == 403 + + @pytest.mark.asyncio + async def test_duplicate_name_returns_409(self, client, mock_service): + """Test that DuplicateDataSourceNameError maps to 409.""" + mock_service.update.side_effect = DuplicateDataSourceNameError("dup") + + resp = await client.patch( + f"/management/data-sources/{DS_ID}", + json={"name": "Existing Name"}, + ) + + assert resp.status_code == 409 + + @pytest.mark.asyncio + async def test_not_found_returns_400(self, client, mock_service): + """Test that ValueError from service maps to 400.""" + mock_service.update.side_effect = ValueError("not found") + + resp = await client.patch( + f"/management/data-sources/{DS_ID}", + json={"name": "New Name"}, + ) + + assert resp.status_code == 400 + + +class TestDeleteDataSource: + """Tests for DELETE /management/data-sources/{ds_id}.""" + + @pytest.mark.asyncio + async def test_deletes_successfully(self, client, mock_service): + """Test successful delete returns 204.""" + mock_service.delete.return_value = True + + resp = await client.delete(f"/management/data-sources/{DS_ID}") + + assert resp.status_code == 204 + + @pytest.mark.asyncio + async def test_not_found_returns_404(self, client, mock_service): + """Test that False from service maps to 404.""" + mock_service.delete.return_value = False + + resp = await client.delete(f"/management/data-sources/{DS_ID}") + + assert resp.status_code == 404 + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.delete.side_effect = UnauthorizedError("denied") + + resp = await client.delete(f"/management/data-sources/{DS_ID}") + + assert resp.status_code == 403 + + +class TestTriggerSync: + """Tests for POST /management/data-sources/{ds_id}/sync.""" + + @pytest.mark.asyncio + async def test_triggers_successfully(self, client, mock_service): + """Test successful sync trigger returns 202.""" + sync_run = _make_sync_run() + mock_service.trigger_sync.return_value = sync_run + + resp = await client.post(f"/management/data-sources/{DS_ID}/sync") + + assert resp.status_code == 202 + data = resp.json() + assert data["id"] == "01JTEST0000000000000SYNC01" + assert data["data_source_id"] == DS_ID + assert data["status"] == "pending" + assert data["completed_at"] is None + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.trigger_sync.side_effect = UnauthorizedError("denied") + + resp = await client.post(f"/management/data-sources/{DS_ID}/sync") + + assert resp.status_code == 403 + + @pytest.mark.asyncio + async def test_not_found_returns_404(self, client, mock_service): + """Test that ValueError maps to 404.""" + mock_service.trigger_sync.side_effect = ValueError("not found") + + resp = await client.post(f"/management/data-sources/{DS_ID}/sync") + + assert resp.status_code == 404 diff --git a/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py b/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py new file mode 100644 index 00000000..302d52d1 --- /dev/null +++ b/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py @@ -0,0 +1,376 @@ +"""Unit tests for Knowledge Graph route handlers. + +Tests route-level behavior including status codes, response shapes, +error handling, and pagination. Service dependencies are mocked. +""" + +from __future__ import annotations + +from datetime import UTC, datetime +from unittest.mock import AsyncMock + +import pytest +import pytest_asyncio +from httpx import ASGITransport, AsyncClient + +from iam.application.value_objects import CurrentUser +from iam.domain.value_objects import TenantId, UserId +from management.domain.aggregates import KnowledgeGraph +from management.domain.value_objects import KnowledgeGraphId +from management.ports.exceptions import ( + DuplicateKnowledgeGraphNameError, + UnauthorizedError, +) + +# Fixed test data +TENANT_ID = "test-tenant-id" +WORKSPACE_ID = "test-workspace-id" +USER_ID = "test-user-id" +KG_ID = "01JTEST00000000000000KG001" +NOW = datetime(2025, 1, 1, tzinfo=UTC) + + +def _make_current_user() -> CurrentUser: + """Create a CurrentUser for dependency override.""" + return CurrentUser( + user_id=UserId(value=USER_ID), + username="testuser", + tenant_id=TenantId(value=TENANT_ID), + ) + + +def _make_kg( + kg_id: str = KG_ID, + name: str = "Test KG", + description: str = "A test knowledge graph", +) -> KnowledgeGraph: + """Create a KnowledgeGraph aggregate for testing.""" + return KnowledgeGraph( + id=KnowledgeGraphId(value=kg_id), + tenant_id=TENANT_ID, + workspace_id=WORKSPACE_ID, + name=name, + description=description, + created_at=NOW, + updated_at=NOW, + ) + + +@pytest_asyncio.fixture +async def mock_service(): + """Create a mock KnowledgeGraphService.""" + return AsyncMock() + + +@pytest_asyncio.fixture +async def client(mock_service): + """Create an async HTTP client with mocked dependencies.""" + from main import app + + from iam.dependencies.user import get_current_user + from management.dependencies.knowledge_graph import get_knowledge_graph_service + + app.dependency_overrides[get_current_user] = lambda: _make_current_user() + app.dependency_overrides[get_knowledge_graph_service] = lambda: mock_service + + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as c: + yield c + + app.dependency_overrides.clear() + + +class TestCreateKnowledgeGraph: + """Tests for POST /management/workspaces/{workspace_id}/knowledge-graphs.""" + + @pytest.mark.asyncio + async def test_creates_successfully(self, client, mock_service): + """Test successful creation returns 201 with correct response shape.""" + kg = _make_kg() + mock_service.create.return_value = kg + + resp = await client.post( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + json={"name": "Test KG", "description": "A test knowledge graph"}, + ) + + assert resp.status_code == 201 + data = resp.json() + assert data["id"] == KG_ID + assert data["tenant_id"] == TENANT_ID + assert data["workspace_id"] == WORKSPACE_ID + assert data["name"] == "Test KG" + assert data["description"] == "A test knowledge graph" + assert "created_at" in data + assert "updated_at" in data + + mock_service.create.assert_awaited_once_with( + user_id=USER_ID, + workspace_id=WORKSPACE_ID, + name="Test KG", + description="A test knowledge graph", + ) + + @pytest.mark.asyncio + async def test_default_description(self, client, mock_service): + """Test that description defaults to empty string.""" + kg = _make_kg(description="") + mock_service.create.return_value = kg + + resp = await client.post( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + json={"name": "Test KG"}, + ) + + assert resp.status_code == 201 + mock_service.create.assert_awaited_once_with( + user_id=USER_ID, + workspace_id=WORKSPACE_ID, + name="Test KG", + description="", + ) + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.create.side_effect = UnauthorizedError("denied") + + resp = await client.post( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + json={"name": "Test KG"}, + ) + + assert resp.status_code == 403 + + @pytest.mark.asyncio + async def test_duplicate_name_returns_409(self, client, mock_service): + """Test that DuplicateKnowledgeGraphNameError maps to 409.""" + mock_service.create.side_effect = DuplicateKnowledgeGraphNameError("dup") + + resp = await client.post( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + json={"name": "Test KG"}, + ) + + assert resp.status_code == 409 + + @pytest.mark.asyncio + async def test_empty_name_returns_422(self, client, mock_service): + """Test that Pydantic validation rejects empty name.""" + resp = await client.post( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + json={"name": ""}, + ) + + assert resp.status_code == 422 + + @pytest.mark.asyncio + async def test_name_too_long_returns_422(self, client, mock_service): + """Test that Pydantic validation rejects names over 100 characters.""" + resp = await client.post( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + json={"name": "x" * 101}, + ) + + assert resp.status_code == 422 + + +class TestListKnowledgeGraphs: + """Tests for GET /management/workspaces/{workspace_id}/knowledge-graphs.""" + + @pytest.mark.asyncio + async def test_lists_successfully(self, client, mock_service): + """Test successful list returns 200 with correct pagination.""" + kgs = [_make_kg(kg_id=f"01JTEST00000000000000KG00{i}") for i in range(3)] + mock_service.list_for_workspace.return_value = kgs + + resp = await client.get( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 3 + assert len(data["items"]) == 3 + assert data["offset"] == 0 + assert data["limit"] == 20 + + @pytest.mark.asyncio + async def test_pagination_offset_limit(self, client, mock_service): + """Test that offset and limit query params work correctly.""" + kgs = [_make_kg(kg_id=f"01JTEST00000000000000KG00{i}") for i in range(5)] + mock_service.list_for_workspace.return_value = kgs + + resp = await client.get( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs?offset=1&limit=2", + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 5 + assert len(data["items"]) == 2 + assert data["offset"] == 1 + assert data["limit"] == 2 + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.list_for_workspace.side_effect = UnauthorizedError("denied") + + resp = await client.get( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + ) + + assert resp.status_code == 403 + + @pytest.mark.asyncio + async def test_empty_list(self, client, mock_service): + """Test listing returns empty result correctly.""" + mock_service.list_for_workspace.return_value = [] + + resp = await client.get( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 0 + assert data["items"] == [] + + +class TestGetKnowledgeGraph: + """Tests for GET /management/knowledge-graphs/{kg_id}.""" + + @pytest.mark.asyncio + async def test_gets_successfully(self, client, mock_service): + """Test successful get returns 200 with correct response.""" + kg = _make_kg() + mock_service.get.return_value = kg + + resp = await client.get(f"/management/knowledge-graphs/{KG_ID}") + + assert resp.status_code == 200 + data = resp.json() + assert data["id"] == KG_ID + assert data["name"] == "Test KG" + + @pytest.mark.asyncio + async def test_not_found_returns_404(self, client, mock_service): + """Test that None from service maps to 404.""" + mock_service.get.return_value = None + + resp = await client.get(f"/management/knowledge-graphs/{KG_ID}") + + assert resp.status_code == 404 + + +class TestUpdateKnowledgeGraph: + """Tests for PATCH /management/knowledge-graphs/{kg_id}.""" + + @pytest.mark.asyncio + async def test_updates_successfully(self, client, mock_service): + """Test successful update returns 200 with updated data.""" + kg = _make_kg(name="Updated KG") + mock_service.update.return_value = kg + + resp = await client.patch( + f"/management/knowledge-graphs/{KG_ID}", + json={"name": "Updated KG"}, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["name"] == "Updated KG" + + mock_service.update.assert_awaited_once_with( + user_id=USER_ID, + kg_id=KG_ID, + name="Updated KG", + description=None, + ) + + @pytest.mark.asyncio + async def test_partial_update_description_only(self, client, mock_service): + """Test updating only description passes None for name.""" + kg = _make_kg(description="New desc") + mock_service.update.return_value = kg + + resp = await client.patch( + f"/management/knowledge-graphs/{KG_ID}", + json={"description": "New desc"}, + ) + + assert resp.status_code == 200 + mock_service.update.assert_awaited_once_with( + user_id=USER_ID, + kg_id=KG_ID, + name=None, + description="New desc", + ) + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.update.side_effect = UnauthorizedError("denied") + + resp = await client.patch( + f"/management/knowledge-graphs/{KG_ID}", + json={"name": "New Name"}, + ) + + assert resp.status_code == 403 + + @pytest.mark.asyncio + async def test_duplicate_name_returns_409(self, client, mock_service): + """Test that DuplicateKnowledgeGraphNameError maps to 409.""" + mock_service.update.side_effect = DuplicateKnowledgeGraphNameError("dup") + + resp = await client.patch( + f"/management/knowledge-graphs/{KG_ID}", + json={"name": "Existing Name"}, + ) + + assert resp.status_code == 409 + + @pytest.mark.asyncio + async def test_not_found_returns_400(self, client, mock_service): + """Test that ValueError from service maps to 400.""" + mock_service.update.side_effect = ValueError("not found") + + resp = await client.patch( + f"/management/knowledge-graphs/{KG_ID}", + json={"name": "New Name"}, + ) + + assert resp.status_code == 400 + + +class TestDeleteKnowledgeGraph: + """Tests for DELETE /management/knowledge-graphs/{kg_id}.""" + + @pytest.mark.asyncio + async def test_deletes_successfully(self, client, mock_service): + """Test successful delete returns 204.""" + mock_service.delete.return_value = True + + resp = await client.delete(f"/management/knowledge-graphs/{KG_ID}") + + assert resp.status_code == 204 + + @pytest.mark.asyncio + async def test_not_found_returns_404(self, client, mock_service): + """Test that False from service maps to 404.""" + mock_service.delete.return_value = False + + resp = await client.delete(f"/management/knowledge-graphs/{KG_ID}") + + assert resp.status_code == 404 + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.delete.side_effect = UnauthorizedError("denied") + + resp = await client.delete(f"/management/knowledge-graphs/{KG_ID}") + + assert resp.status_code == 403 diff --git a/src/api/tests/unit/management/test_architecture.py b/src/api/tests/unit/management/test_architecture.py index dd6addae..56f1e372 100644 --- a/src/api/tests/unit/management/test_architecture.py +++ b/src/api/tests/unit/management/test_architecture.py @@ -227,14 +227,16 @@ def test_management_does_not_import_iam(self): IAM manages authentication and authorization. Management should not couple to IAM's user, tenant, or API key domain objects. - The management.dependencies package is excluded because DI wiring - is a presentation-layer concern that legitimately crosses context - boundaries (e.g., extracting CurrentUser from IAM for tenant scoping). + The management.dependencies and management.presentation packages + are excluded because DI wiring and route handlers are + presentation-layer concerns that legitimately cross context + boundaries (e.g., extracting CurrentUser from IAM for tenant + scoping and auth dependency injection). """ ( archrule("management_no_iam") .match("management*") - .exclude("management.dependencies*") + .exclude("management.dependencies*", "management.presentation*") .should_not_import("iam*") .check("management") ) From 724aa0ffe73d25dbb022436df5be0bb8a3c7b1d6 Mon Sep 17 00:00:00 2001 From: John Sell Date: Wed, 18 Mar 2026 10:35:43 -0400 Subject: [PATCH 4/9] test(management): add integration tests for KG and DS API endpoints Add integration tests that exercise the full stack: HTTP request -> route -> DI -> service -> SpiceDB + PostgreSQL -> response. Tests cover: - KG creation, listing with pagination, get, update, delete - DS creation with/without credentials, sync trigger (202) - Authorization enforcement (403 for unauthorized users) - Duplicate name detection (409) - Not found handling (404) Includes shared conftest with cleanup fixtures, SpiceDB permission helpers, and encryption key setup. Co-Authored-By: Claude Opus 4.6 --- .../tests/integration/management/conftest.py | 245 +++++++-------- .../management/test_data_source_api.py | 208 +++++++++++++ .../management/test_knowledge_graph_api.py | 286 ++++++++++++++++++ 3 files changed, 602 insertions(+), 137 deletions(-) create mode 100644 src/api/tests/integration/management/test_data_source_api.py create mode 100644 src/api/tests/integration/management/test_knowledge_graph_api.py diff --git a/src/api/tests/integration/management/conftest.py b/src/api/tests/integration/management/conftest.py index 8167f93b..7dbba4d6 100644 --- a/src/api/tests/integration/management/conftest.py +++ b/src/api/tests/integration/management/conftest.py @@ -1,7 +1,6 @@ """Integration test fixtures for Management bounded context. -These fixtures require a running PostgreSQL instance. -Database settings follow the same pattern as IAM integration tests. +These fixtures require running PostgreSQL, SpiceDB, and Keycloak instances. """ from __future__ import annotations @@ -11,31 +10,33 @@ import pytest import pytest_asyncio +from asgi_lifespan import LifespanManager +from cryptography.fernet import Fernet +from httpx import ASGITransport, AsyncClient +from jose import jwt as jose_jwt from pydantic import SecretStr from sqlalchemy import text -from sqlalchemy.exc import ProgrammingError from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker -from ulid import ULID +from infrastructure.authorization_dependencies import get_spicedb_client from infrastructure.database.engines import create_write_engine -from infrastructure.outbox.repository import OutboxRepository from infrastructure.settings import DatabaseSettings -from management.infrastructure.repositories.data_source_repository import ( - DataSourceRepository, -) -from management.infrastructure.repositories.data_source_sync_run_repository import ( - DataSourceSyncRunRepository, -) -from management.infrastructure.repositories.knowledge_graph_repository import ( - KnowledgeGraphRepository, +from shared_kernel.authorization.protocols import AuthorizationProvider +from shared_kernel.authorization.types import ( + Permission, + ResourceType, + format_resource, + format_subject, ) +from tests.integration.iam.conftest import wait_for_permission -pytestmark = pytest.mark.integration +# Ensure encryption key is available for management services +os.environ.setdefault("KARTOGRAPH_MGMT_ENCRYPTION_KEY", Fernet.generate_key().decode()) @pytest.fixture(scope="session") -def management_db_settings() -> DatabaseSettings: - """Database settings for Management integration tests.""" +def mgmt_db_settings() -> DatabaseSettings: + """Database settings for management integration tests.""" return DatabaseSettings( host=os.getenv("KARTOGRAPH_DB_HOST", "localhost"), port=int(os.getenv("KARTOGRAPH_DB_PORT", "5432")), @@ -49,11 +50,11 @@ def management_db_settings() -> DatabaseSettings: @pytest_asyncio.fixture -async def async_session( - management_db_settings: DatabaseSettings, +async def mgmt_async_session( + mgmt_db_settings: DatabaseSettings, ) -> AsyncGenerator[AsyncSession, None]: - """Provide an async session for integration tests.""" - engine = create_write_engine(management_db_settings) + """Provide an async session for management integration tests.""" + engine = create_write_engine(mgmt_db_settings) sessionmaker = async_sessionmaker(engine, expire_on_commit=False) async with sessionmaker() as session: @@ -63,152 +64,122 @@ async def async_session( @pytest_asyncio.fixture -async def session_factory( - management_db_settings: DatabaseSettings, -) -> AsyncGenerator[async_sessionmaker[AsyncSession], None]: - """Provide a session factory for tests that need to create multiple sessions. +async def async_client(): + """Create async HTTP client for testing with lifespan support.""" + from main import app - This is needed for components that create their own sessions. - """ - engine = create_write_engine(management_db_settings) - factory = async_sessionmaker(engine, expire_on_commit=False) + async with LifespanManager(app): + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as client: + yield client - yield factory - await engine.dispose() +@pytest.fixture +def spicedb_client() -> AuthorizationProvider: + """Provide a SpiceDB client for integration tests.""" + return get_spicedb_client() + + +@pytest.fixture +def alice_user_id(alice_token: str) -> str: + """Extract the actual user_id (sub claim) from alice's JWT token.""" + claims = jose_jwt.get_unverified_claims(alice_token) + return claims["sub"] @pytest_asyncio.fixture async def clean_management_data( - async_session: AsyncSession, + mgmt_async_session: AsyncSession, ) -> AsyncGenerator[None, None]: - """Clean Management tables before and after tests. + """Clean management tables before and after tests. Deletion order respects FK constraints: - outbox (management events) -> data_source_sync_runs -> - data_sources -> knowledge_graphs - - Guards against tables not existing yet (TDD-first: tests may be - collected before migration runs). + data_source_sync_runs -> data_sources -> knowledge_graphs """ async def cleanup() -> None: - """Perform cleanup with proper FK constraint ordering.""" try: - # Clean encrypted credentials first - await async_session.execute(text("DELETE FROM encrypted_credentials")) - # Clean management-related outbox entries - await async_session.execute( + await mgmt_async_session.execute(text("DELETE FROM data_source_sync_runs")) + await mgmt_async_session.execute(text("DELETE FROM data_sources")) + await mgmt_async_session.execute(text("DELETE FROM knowledge_graphs")) + # Clean outbox entries related to management + await mgmt_async_session.execute( text( - "DELETE FROM outbox WHERE aggregate_type " - "IN ('knowledge_graph', 'data_source')" + "DELETE FROM outbox WHERE aggregate_type IN " + "('KnowledgeGraph', 'DataSource')" ) ) - await async_session.execute(text("DELETE FROM data_source_sync_runs")) - await async_session.execute(text("DELETE FROM data_sources")) - await async_session.execute(text("DELETE FROM knowledge_graphs")) - await async_session.commit() - except ProgrammingError: - # Tables may not exist yet if migration hasn't run - await async_session.rollback() - - # Clean before test - await cleanup() + await mgmt_async_session.commit() + except Exception: + await mgmt_async_session.rollback() + raise + await cleanup() yield - - # Clean after test await cleanup() -@pytest.fixture -def knowledge_graph_repository( - async_session: AsyncSession, -) -> KnowledgeGraphRepository: - """Provide a KnowledgeGraphRepository for integration tests.""" - outbox = OutboxRepository(session=async_session) - return KnowledgeGraphRepository(session=async_session, outbox=outbox) - - -@pytest.fixture -def data_source_repository( - async_session: AsyncSession, -) -> DataSourceRepository: - """Provide a DataSourceRepository for integration tests.""" - outbox = OutboxRepository(session=async_session) - return DataSourceRepository(session=async_session, outbox=outbox) - - -@pytest.fixture -def data_source_sync_run_repository( - async_session: AsyncSession, -) -> DataSourceSyncRunRepository: - """Provide a DataSourceSyncRunRepository for integration tests.""" - return DataSourceSyncRunRepository(session=async_session) - +async def grant_kg_permission( + spicedb_client: AuthorizationProvider, + user_id: str, + kg_id: str, + workspace_id: str, +) -> None: + """Set up SpiceDB relationships for a KG. -@pytest_asyncio.fixture -async def test_tenant( - async_session: AsyncSession, - clean_management_data: None, -) -> AsyncGenerator[str, None]: - """Create a tenant in the tenants table for FK satisfaction. - - Uses raw SQL to insert directly, avoiding dependency on IAM domain objects. - Cleans up the test tenant on teardown. - Returns the tenant_id string. + Writes the workspace relationship on the KG and grants + admin permission on the KG to the user. """ - tenant_id = str(ULID()) - await async_session.execute( - text( - "INSERT INTO tenants (id, name, created_at, updated_at) " - "VALUES (:id, :name, NOW(), NOW())" - ), - {"id": tenant_id, "name": f"test-tenant-{tenant_id}"}, + kg_resource = format_resource(ResourceType.KNOWLEDGE_GRAPH, kg_id) + user_subject = format_subject(ResourceType.USER, user_id) + ws_subject = format_resource(ResourceType.WORKSPACE, workspace_id) + + # Write workspace parent relationship + await spicedb_client.write_relationship( + resource=kg_resource, + relation="workspace", + subject=ws_subject, ) - await async_session.commit() - yield tenant_id + # Wait for view permission (inherited from workspace) + view_ready = await wait_for_permission( + spicedb_client, + resource=kg_resource, + permission=Permission.VIEW, + subject=user_subject, + timeout=5.0, + ) + assert view_ready, "Timed out waiting for KG view permission" - # Teardown: remove test tenant (workspaces cleaned first by test_workspace) - try: - await async_session.execute( - text("DELETE FROM workspaces WHERE tenant_id = :tid"), - {"tid": tenant_id}, - ) - await async_session.execute( - text("DELETE FROM tenants WHERE id = :tid"), - {"tid": tenant_id}, - ) - await async_session.commit() - except ProgrammingError: - await async_session.rollback() +async def grant_ds_permission( + spicedb_client: AuthorizationProvider, + user_id: str, + ds_id: str, + kg_id: str, +) -> None: + """Set up SpiceDB relationships for a DataSource. -@pytest_asyncio.fixture -async def test_workspace( - async_session: AsyncSession, - test_tenant: str, -) -> AsyncGenerator[str, None]: - """Create a workspace in the workspaces table for FK satisfaction. - - Depends on test_tenant to ensure a valid tenant_id FK reference. - Uses raw SQL to insert directly, avoiding dependency on IAM domain objects. - Returns the workspace_id string. + Writes the knowledge_graph relationship on the DS so permissions + inherit from the KG. """ - workspace_id = str(ULID()) - await async_session.execute( - text( - "INSERT INTO workspaces (id, tenant_id, name, is_root, created_at, updated_at) " - "VALUES (:id, :tenant_id, :name, :is_root, NOW(), NOW())" - ), - { - "id": workspace_id, - "tenant_id": test_tenant, - "name": f"test-workspace-{workspace_id}", - "is_root": True, - }, + ds_resource = format_resource(ResourceType.DATA_SOURCE, ds_id) + user_subject = format_subject(ResourceType.USER, user_id) + kg_subject = format_resource(ResourceType.KNOWLEDGE_GRAPH, kg_id) + + # Write knowledge_graph parent relationship + await spicedb_client.write_relationship( + resource=ds_resource, + relation="knowledge_graph", + subject=kg_subject, ) - await async_session.commit() - yield workspace_id + # Wait for view permission (inherited from KG) + view_ready = await wait_for_permission( + spicedb_client, + resource=ds_resource, + permission=Permission.VIEW, + subject=user_subject, + timeout=5.0, + ) + assert view_ready, "Timed out waiting for DS view permission" diff --git a/src/api/tests/integration/management/test_data_source_api.py b/src/api/tests/integration/management/test_data_source_api.py new file mode 100644 index 00000000..b898e26d --- /dev/null +++ b/src/api/tests/integration/management/test_data_source_api.py @@ -0,0 +1,208 @@ +"""Integration tests for Data Source API endpoints. + +Tests the full stack: HTTP request -> route -> DI -> service -> +SpiceDB + PostgreSQL -> response. +""" + +from __future__ import annotations + +import pytest +from httpx import AsyncClient + +from shared_kernel.authorization.protocols import AuthorizationProvider +from tests.integration.management.conftest import ( + grant_ds_permission, + grant_kg_permission, +) + +pytestmark = [pytest.mark.integration, pytest.mark.keycloak] + + +async def _get_root_workspace_id( + async_client: AsyncClient, tenant_auth_headers: dict +) -> str: + """Get the root workspace ID for the default tenant.""" + resp = await async_client.get("/iam/workspaces", headers=tenant_auth_headers) + assert resp.status_code == 200 + workspaces = resp.json()["workspaces"] + root = next((w for w in workspaces if w["is_root"]), None) + assert root is not None, "Root workspace should exist" + return root["id"] + + +async def _create_kg( + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + workspace_id: str, + name: str = "Test KG for DS", +) -> str: + """Create a KG and set up SpiceDB permissions. Returns the KG ID.""" + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": name}, + ) + assert resp.status_code == 201, f"Failed to create KG: {resp.text}" + kg_id = resp.json()["id"] + + await grant_kg_permission(spicedb_client, alice_user_id, kg_id, workspace_id) + return kg_id + + +class TestDataSourceCreation: + """Tests for POST /management/knowledge-graphs/{kg_id}/data-sources.""" + + @pytest.mark.asyncio + async def test_creates_data_source( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test creating a DS via API returns 201 with correct fields.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + ) + + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": "Test DS", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + }, + ) + + assert resp.status_code == 201, f"Unexpected: {resp.status_code} {resp.text}" + data = resp.json() + assert data["name"] == "Test DS" + assert data["adapter_type"] == "github" + assert data["knowledge_graph_id"] == kg_id + assert data["has_credentials"] is False + assert data["schedule_type"] == "manual" + assert "id" in data + + @pytest.mark.asyncio + async def test_creates_data_source_with_credentials( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test creating a DS with credentials sets has_credentials=true.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + name="KG with creds", + ) + + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": "DS with creds", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + "credentials": {"token": "ghp_secrettoken123"}, + }, + ) + + assert resp.status_code == 201 + data = resp.json() + assert data["has_credentials"] is True + # Credentials should NOT be in response + assert "credentials" not in data + assert "credentials_path" not in data + + +class TestDataSourceTriggerSync: + """Tests for POST /management/data-sources/{ds_id}/sync.""" + + @pytest.mark.asyncio + async def test_trigger_sync_returns_202( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test triggering a sync returns 202 with sync run details.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + name="KG for sync", + ) + + # Create DS + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": "Sync DS", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + }, + ) + assert resp.status_code == 201 + ds_id = resp.json()["id"] + + # Set up SpiceDB permissions for the DS + await grant_ds_permission(spicedb_client, alice_user_id, ds_id, kg_id) + + # Trigger sync + resp = await async_client.post( + f"/management/data-sources/{ds_id}/sync", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 202 + data = resp.json() + assert data["data_source_id"] == ds_id + assert data["status"] == "pending" + assert "id" in data + assert "started_at" in data + + +class TestDataSourceAuthorization: + """Tests for authorization enforcement on DS endpoints.""" + + @pytest.mark.asyncio + async def test_unauthorized_user_gets_403( + self, + async_client: AsyncClient, + bob_tenant_auth_headers: dict, + clean_management_data, + ): + """Test that a user without KG edit permission gets 403.""" + resp = await async_client.post( + "/management/knowledge-graphs/01JFAKEKG0000000000000000000/data-sources", + headers=bob_tenant_auth_headers, + json={ + "name": "Unauthorized DS", + "adapter_type": "github", + "connection_config": {}, + }, + ) + + assert resp.status_code == 403 diff --git a/src/api/tests/integration/management/test_knowledge_graph_api.py b/src/api/tests/integration/management/test_knowledge_graph_api.py new file mode 100644 index 00000000..4ca9ca8f --- /dev/null +++ b/src/api/tests/integration/management/test_knowledge_graph_api.py @@ -0,0 +1,286 @@ +"""Integration tests for Knowledge Graph API endpoints. + +Tests the full stack: HTTP request -> route -> DI -> service -> +SpiceDB + PostgreSQL -> response. +""" + +from __future__ import annotations + +import pytest +from httpx import AsyncClient + +from shared_kernel.authorization.protocols import AuthorizationProvider +from tests.integration.management.conftest import grant_kg_permission + +pytestmark = [pytest.mark.integration, pytest.mark.keycloak] + + +async def _get_root_workspace_id( + async_client: AsyncClient, tenant_auth_headers: dict +) -> str: + """Get the root workspace ID for the default tenant.""" + resp = await async_client.get("/iam/workspaces", headers=tenant_auth_headers) + assert resp.status_code == 200 + workspaces = resp.json()["workspaces"] + root = next((w for w in workspaces if w["is_root"]), None) + assert root is not None, "Root workspace should exist" + return root["id"] + + +class TestKnowledgeGraphCreation: + """Tests for POST /management/workspaces/{workspace_id}/knowledge-graphs.""" + + @pytest.mark.asyncio + async def test_creates_knowledge_graph( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + clean_management_data, + ): + """Test creating a KG via API returns 201 with correct fields.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": "Test KG", "description": "Integration test KG"}, + ) + + assert resp.status_code == 201, f"Unexpected: {resp.status_code} {resp.text}" + data = resp.json() + assert data["name"] == "Test KG" + assert data["description"] == "Integration test KG" + assert data["workspace_id"] == workspace_id + assert "id" in data + assert "tenant_id" in data + assert "created_at" in data + assert "updated_at" in data + + @pytest.mark.asyncio + async def test_duplicate_name_returns_409( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + clean_management_data, + ): + """Test that creating a KG with duplicate name returns 409.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + + # Create first KG + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": "Unique KG"}, + ) + assert resp.status_code == 201 + + # Attempt duplicate + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": "Unique KG"}, + ) + assert resp.status_code == 409 + + +class TestKnowledgeGraphList: + """Tests for GET /management/workspaces/{workspace_id}/knowledge-graphs.""" + + @pytest.mark.asyncio + async def test_lists_knowledge_graphs_with_pagination( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test listing KGs with pagination.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + + # Create 3 KGs + kg_ids = [] + for i in range(3): + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": f"KG {i}"}, + ) + assert resp.status_code == 201 + kg_id = resp.json()["id"] + kg_ids.append(kg_id) + + # Set up SpiceDB relationships for listing + await grant_kg_permission( + spicedb_client, alice_user_id, kg_id, workspace_id + ) + + # List with pagination + resp = await async_client.get( + f"/management/workspaces/{workspace_id}/knowledge-graphs?offset=0&limit=2", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 3 + assert len(data["items"]) == 2 + assert data["offset"] == 0 + assert data["limit"] == 2 + + +class TestKnowledgeGraphGet: + """Tests for GET /management/knowledge-graphs/{kg_id}.""" + + @pytest.mark.asyncio + async def test_gets_knowledge_graph( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test getting a KG by ID returns 200.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + + # Create KG + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": "Get Test KG"}, + ) + assert resp.status_code == 201 + kg_id = resp.json()["id"] + + # Grant SpiceDB permissions + await grant_kg_permission(spicedb_client, alice_user_id, kg_id, workspace_id) + + # Get KG + resp = await async_client.get( + f"/management/knowledge-graphs/{kg_id}", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["id"] == kg_id + assert data["name"] == "Get Test KG" + + @pytest.mark.asyncio + async def test_get_nonexistent_returns_404( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + clean_management_data, + ): + """Test getting a nonexistent KG returns 404.""" + resp = await async_client.get( + "/management/knowledge-graphs/01JNONEXISTENT000000000000", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 404 + + +class TestKnowledgeGraphUpdate: + """Tests for PATCH /management/knowledge-graphs/{kg_id}.""" + + @pytest.mark.asyncio + async def test_updates_knowledge_graph( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test updating a KG returns 200 with updated data.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + + # Create KG + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": "Original Name"}, + ) + assert resp.status_code == 201 + kg_id = resp.json()["id"] + + # Grant SpiceDB permissions + await grant_kg_permission(spicedb_client, alice_user_id, kg_id, workspace_id) + + # Update KG + resp = await async_client.patch( + f"/management/knowledge-graphs/{kg_id}", + headers=tenant_auth_headers, + json={"name": "Updated Name"}, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["name"] == "Updated Name" + + +class TestKnowledgeGraphDelete: + """Tests for DELETE /management/knowledge-graphs/{kg_id}.""" + + @pytest.mark.asyncio + async def test_deletes_knowledge_graph( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test deleting a KG returns 204.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + + # Create KG + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": "Delete Test KG"}, + ) + assert resp.status_code == 201 + kg_id = resp.json()["id"] + + # Grant SpiceDB permissions (need manage for delete) + await grant_kg_permission(spicedb_client, alice_user_id, kg_id, workspace_id) + + # Delete KG + resp = await async_client.delete( + f"/management/knowledge-graphs/{kg_id}", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 204 + + # Verify it's gone + resp = await async_client.get( + f"/management/knowledge-graphs/{kg_id}", + headers=tenant_auth_headers, + ) + assert resp.status_code == 404 + + +class TestKnowledgeGraphAuthorization: + """Tests for authorization enforcement on KG endpoints.""" + + @pytest.mark.asyncio + async def test_unauthorized_user_gets_403( + self, + async_client: AsyncClient, + bob_tenant_auth_headers: dict, + clean_management_data, + ): + """Test that a user without workspace edit permission gets 403.""" + # Bob has tenant membership but no workspace admin/edit + resp = await async_client.post( + "/management/workspaces/01JFAKEWORKSPACE00000000000/knowledge-graphs", + headers=bob_tenant_auth_headers, + json={"name": "Unauthorized KG"}, + ) + + assert resp.status_code == 403 From cfce7119a87183a6f33829dc7668b8bb17d3cfc2 Mon Sep 17 00:00:00 2001 From: John Sell Date: Wed, 18 Mar 2026 10:41:08 -0400 Subject: [PATCH 5/9] fix(management): fix HTTP status codes, add defensive error handling, and expand integration tests - Return 404 (not 400) for not-found resources in update routes - Add defensive UnauthorizedError handler on get routes - Use generic error messages in trigger_sync to prevent information leakage - Add integration tests for DS get, list, update, and delete endpoints Co-Authored-By: Claude Opus 4.6 --- .../presentation/data_sources/routes.py | 28 ++- .../presentation/knowledge_graphs/routes.py | 24 +- .../management/test_data_source_api.py | 210 ++++++++++++++++++ .../presentation/test_data_source_routes.py | 6 +- .../test_knowledge_graph_routes.py | 6 +- 5 files changed, 254 insertions(+), 20 deletions(-) diff --git a/src/api/management/presentation/data_sources/routes.py b/src/api/management/presentation/data_sources/routes.py index 9327b55d..f996dd22 100644 --- a/src/api/management/presentation/data_sources/routes.py +++ b/src/api/management/presentation/data_sources/routes.py @@ -118,13 +118,19 @@ async def get_data_source( service: Annotated[DataSourceService, Depends(get_data_source_service)], ) -> DataSourceResponse: """Get a data source by ID.""" - ds = await service.get(user_id=current_user.user_id.value, ds_id=ds_id) - if ds is None: + try: + ds = await service.get(user_id=current_user.user_id.value, ds_id=ds_id) + if ds is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Data source not found", + ) + return DataSourceResponse.from_domain(ds) + except UnauthorizedError: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Data source not found", + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", ) - return DataSourceResponse.from_domain(ds) @router.patch( @@ -159,9 +165,15 @@ async def update_data_source( detail="A data source with this name already exists in this knowledge graph", ) except ValueError as e: + error_msg = str(e) + if "not found" in error_msg: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Data source not found", + ) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail=str(e), + detail=error_msg, ) @@ -224,8 +236,8 @@ async def trigger_sync( status_code=status.HTTP_403_FORBIDDEN, detail="You do not have permission to perform this action", ) - except ValueError as e: + except ValueError: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, - detail=str(e), + detail="Data source not found", ) diff --git a/src/api/management/presentation/knowledge_graphs/routes.py b/src/api/management/presentation/knowledge_graphs/routes.py index 1084bb0f..554ab726 100644 --- a/src/api/management/presentation/knowledge_graphs/routes.py +++ b/src/api/management/presentation/knowledge_graphs/routes.py @@ -109,13 +109,19 @@ async def get_knowledge_graph( service: Annotated[KnowledgeGraphService, Depends(get_knowledge_graph_service)], ) -> KnowledgeGraphResponse: """Get a knowledge graph by ID.""" - kg = await service.get(user_id=current_user.user_id.value, kg_id=kg_id) - if kg is None: + try: + kg = await service.get(user_id=current_user.user_id.value, kg_id=kg_id) + if kg is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Knowledge graph not found", + ) + return KnowledgeGraphResponse.from_domain(kg) + except UnauthorizedError: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Knowledge graph not found", + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", ) - return KnowledgeGraphResponse.from_domain(kg) @router.patch( @@ -149,9 +155,15 @@ async def update_knowledge_graph( detail="A knowledge graph with this name already exists", ) except (InvalidKnowledgeGraphNameError, ValueError) as e: + error_msg = str(e) + if "not found" in error_msg: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Knowledge graph not found", + ) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail=str(e), + detail=error_msg, ) diff --git a/src/api/tests/integration/management/test_data_source_api.py b/src/api/tests/integration/management/test_data_source_api.py index b898e26d..dacc269f 100644 --- a/src/api/tests/integration/management/test_data_source_api.py +++ b/src/api/tests/integration/management/test_data_source_api.py @@ -184,6 +184,216 @@ async def test_trigger_sync_returns_202( assert "started_at" in data +class TestGetDataSource: + """Tests for GET /management/data-sources/{ds_id}.""" + + @pytest.mark.asyncio + async def test_get_data_source_by_id( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test getting a DS by ID returns 200 with correct fields.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + name="KG for get DS", + ) + + # Create DS + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": "Get DS Test", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + }, + ) + assert resp.status_code == 201 + ds_id = resp.json()["id"] + + # Set up SpiceDB permissions for the DS + await grant_ds_permission(spicedb_client, alice_user_id, ds_id, kg_id) + + # Get DS by ID + resp = await async_client.get( + f"/management/data-sources/{ds_id}", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["id"] == ds_id + assert data["name"] == "Get DS Test" + assert data["adapter_type"] == "github" + assert data["knowledge_graph_id"] == kg_id + assert data["has_credentials"] is False + assert data["schedule_type"] == "manual" + + +class TestListDataSources: + """Tests for GET /management/knowledge-graphs/{kg_id}/data-sources.""" + + @pytest.mark.asyncio + async def test_list_data_sources_for_kg( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test listing DSes for a KG returns 200 with pagination.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + name="KG for list DS", + ) + + # Create 2 DSes + for i in range(2): + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": f"List DS {i}", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": f"repo-{i}"}, + }, + ) + assert resp.status_code == 201 + + # List DSes for KG + resp = await async_client.get( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 2 + assert len(data["items"]) == 2 + assert "offset" in data + assert "limit" in data + + +class TestUpdateDataSource: + """Tests for PATCH /management/data-sources/{ds_id}.""" + + @pytest.mark.asyncio + async def test_update_data_source( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test updating a DS returns 200 with updated name.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + name="KG for update DS", + ) + + # Create DS + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": "Original DS", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + }, + ) + assert resp.status_code == 201 + ds_id = resp.json()["id"] + + # Set up SpiceDB permissions for the DS + await grant_ds_permission(spicedb_client, alice_user_id, ds_id, kg_id) + + # Update DS + resp = await async_client.patch( + f"/management/data-sources/{ds_id}", + headers=tenant_auth_headers, + json={"name": "Updated Name"}, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["name"] == "Updated Name" + + +class TestDeleteDataSource: + """Tests for DELETE /management/data-sources/{ds_id}.""" + + @pytest.mark.asyncio + async def test_delete_data_source( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test deleting a DS returns 204 and subsequent GET returns 404.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + name="KG for delete DS", + ) + + # Create DS + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": "Delete DS", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + }, + ) + assert resp.status_code == 201 + ds_id = resp.json()["id"] + + # Set up SpiceDB permissions for the DS + await grant_ds_permission(spicedb_client, alice_user_id, ds_id, kg_id) + + # Delete DS + resp = await async_client.delete( + f"/management/data-sources/{ds_id}", + headers=tenant_auth_headers, + ) + assert resp.status_code == 204 + + # Verify DS is gone + resp = await async_client.get( + f"/management/data-sources/{ds_id}", + headers=tenant_auth_headers, + ) + assert resp.status_code == 404 + + class TestDataSourceAuthorization: """Tests for authorization enforcement on DS endpoints.""" diff --git a/src/api/tests/unit/management/presentation/test_data_source_routes.py b/src/api/tests/unit/management/presentation/test_data_source_routes.py index 901cad7f..1234a0ce 100644 --- a/src/api/tests/unit/management/presentation/test_data_source_routes.py +++ b/src/api/tests/unit/management/presentation/test_data_source_routes.py @@ -354,8 +354,8 @@ async def test_duplicate_name_returns_409(self, client, mock_service): assert resp.status_code == 409 @pytest.mark.asyncio - async def test_not_found_returns_400(self, client, mock_service): - """Test that ValueError from service maps to 400.""" + async def test_not_found_returns_404(self, client, mock_service): + """Test that ValueError with 'not found' from service maps to 404.""" mock_service.update.side_effect = ValueError("not found") resp = await client.patch( @@ -363,7 +363,7 @@ async def test_not_found_returns_400(self, client, mock_service): json={"name": "New Name"}, ) - assert resp.status_code == 400 + assert resp.status_code == 404 class TestDeleteDataSource: diff --git a/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py b/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py index 302d52d1..cfb6b13b 100644 --- a/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py +++ b/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py @@ -333,8 +333,8 @@ async def test_duplicate_name_returns_409(self, client, mock_service): assert resp.status_code == 409 @pytest.mark.asyncio - async def test_not_found_returns_400(self, client, mock_service): - """Test that ValueError from service maps to 400.""" + async def test_not_found_returns_404(self, client, mock_service): + """Test that ValueError with 'not found' from service maps to 404.""" mock_service.update.side_effect = ValueError("not found") resp = await client.patch( @@ -342,7 +342,7 @@ async def test_not_found_returns_400(self, client, mock_service): json={"name": "New Name"}, ) - assert resp.status_code == 400 + assert resp.status_code == 404 class TestDeleteKnowledgeGraph: From 74e6ab05a1093446e10bb5d5c794d649dccc32a3 Mon Sep 17 00:00:00 2001 From: John Sell Date: Wed, 18 Mar 2026 10:49:47 -0400 Subject: [PATCH 6/9] fix(management): add catch-all error handlers and database-level pagination - Add generic except Exception catch-all to all route handlers matching IAM pattern, preventing unhandled exceptions from leaking stack traces - Move pagination from application-layer slicing to database-level LIMIT/OFFSET queries in repositories - Repository find methods now return (items, total_count) tuples - Service list methods pass through pagination parameters - Routes pass offset/limit query params directly to services Co-Authored-By: Claude Opus 4.6 --- .../services/data_source_service.py | 17 ++++--- .../services/knowledge_graph_service.py | 24 +++++++--- .../repositories/data_source_repository.py | 26 ++++++++--- .../knowledge_graph_repository.py | 26 ++++++++--- src/api/management/ports/repositories.py | 20 ++++++--- .../presentation/data_sources/routes.py | 45 ++++++++++++++++--- .../presentation/knowledge_graphs/routes.py | 38 +++++++++++++--- .../management/test_data_source_repository.py | 10 ++++- .../test_knowledge_graph_repository.py | 8 +++- .../application/test_data_source_service.py | 9 ++-- .../test_knowledge_graph_service.py | 12 ++--- .../presentation/test_data_source_routes.py | 7 +-- .../test_knowledge_graph_routes.py | 9 ++-- 13 files changed, 193 insertions(+), 58 deletions(-) diff --git a/src/api/management/application/services/data_source_service.py b/src/api/management/application/services/data_source_service.py index bb8cf66f..8560e6c6 100644 --- a/src/api/management/application/services/data_source_service.py +++ b/src/api/management/application/services/data_source_service.py @@ -230,15 +230,20 @@ async def list_for_knowledge_graph( self, user_id: str, kg_id: str, - ) -> list[DataSource]: - """List data sources for a knowledge graph. + *, + offset: int = 0, + limit: int = 20, + ) -> tuple[list[DataSource], int]: + """List data sources for a knowledge graph with pagination. Args: user_id: The user requesting the list kg_id: The knowledge graph to list DSes for + offset: Number of records to skip + limit: Maximum number of records to return Returns: - List of DataSource aggregates + Tuple of (paginated DataSource aggregates, total count) Raises: UnauthorizedError: If user lacks VIEW permission on KG @@ -265,14 +270,16 @@ async def list_for_knowledge_graph( if kg is None or kg.tenant_id != self._scope_to_tenant: raise UnauthorizedError(f"Knowledge graph {kg_id} not accessible") - data_sources = await self._ds_repo.find_by_knowledge_graph(kg_id) + data_sources, total = await self._ds_repo.find_by_knowledge_graph( + kg_id, offset=offset, limit=limit + ) self._probe.data_sources_listed( kg_id=kg_id, count=len(data_sources), ) - return data_sources + return data_sources, total async def update( self, diff --git a/src/api/management/application/services/knowledge_graph_service.py b/src/api/management/application/services/knowledge_graph_service.py index f57b0f55..d037eeca 100644 --- a/src/api/management/application/services/knowledge_graph_service.py +++ b/src/api/management/application/services/knowledge_graph_service.py @@ -200,18 +200,24 @@ async def list_for_workspace( self, user_id: str, workspace_id: str, - ) -> list[KnowledgeGraph]: - """List knowledge graphs in a workspace. + *, + offset: int = 0, + limit: int = 20, + ) -> tuple[list[KnowledgeGraph], int]: + """List knowledge graphs in a workspace with pagination. Uses read_relationships to discover KG IDs linked to the workspace, then fetches each from the repository and filters by tenant. + Pagination is applied after filtering. Args: user_id: The user requesting the list workspace_id: The workspace to list KGs for + offset: Number of records to skip + limit: Maximum number of records to return Returns: - List of KnowledgeGraph aggregates + Tuple of (paginated KnowledgeGraph aggregates, total count) Raises: UnauthorizedError: If user lacks VIEW permission on workspace @@ -250,18 +256,22 @@ async def list_for_workspace( kg_ids.append(parts[1]) # Fetch each KG from repo and filter by tenant + # (N+1 problem - acceptable for walking skeleton) kgs: list[KnowledgeGraph] = [] for kg_id in kg_ids: kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) if kg is not None and kg.tenant_id == self._scope_to_tenant: kgs.append(kg) + total = len(kgs) + paginated = kgs[offset : offset + limit] + self._probe.knowledge_graphs_listed( workspace_id=workspace_id, - count=len(kgs), + count=len(paginated), ) - return kgs + return paginated, total async def update( self, @@ -377,7 +387,9 @@ async def delete( async with self._session.begin(): # Cascade delete data sources if repo is available if self._ds_repo is not None: - data_sources = await self._ds_repo.find_by_knowledge_graph(kg_id) + data_sources, _ = await self._ds_repo.find_by_knowledge_graph( + kg_id, offset=0, limit=10000 + ) for ds in data_sources: ds.mark_for_deletion(deleted_by=user_id) await self._ds_repo.delete(ds) diff --git a/src/api/management/infrastructure/repositories/data_source_repository.py b/src/api/management/infrastructure/repositories/data_source_repository.py index 7901a1f0..05aff7fc 100644 --- a/src/api/management/infrastructure/repositories/data_source_repository.py +++ b/src/api/management/infrastructure/repositories/data_source_repository.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING -from sqlalchemy import select +from sqlalchemy import func, select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession @@ -128,17 +128,31 @@ async def get_by_id(self, data_source_id: DataSourceId) -> DataSource | None: return self._to_domain(model) async def find_by_knowledge_graph( - self, knowledge_graph_id: str - ) -> list[DataSource]: - stmt = select(DataSourceModel).where( - DataSourceModel.knowledge_graph_id == knowledge_graph_id + self, knowledge_graph_id: str, *, offset: int = 0, limit: int = 20 + ) -> tuple[list[DataSource], int]: + # Count query + count_stmt = ( + select(func.count()) + .select_from(DataSourceModel) + .where(DataSourceModel.knowledge_graph_id == knowledge_graph_id) + ) + count_result = await self._session.execute(count_stmt) + total = count_result.scalar_one() + + # Paginated query + stmt = ( + select(DataSourceModel) + .where(DataSourceModel.knowledge_graph_id == knowledge_graph_id) + .offset(offset) + .limit(limit) + .order_by(DataSourceModel.created_at.desc()) ) result = await self._session.execute(stmt) models = result.scalars().all() data_sources = [self._to_domain(model) for model in models] self._probe.data_sources_listed(knowledge_graph_id, len(data_sources)) - return data_sources + return data_sources, total async def delete(self, data_source: DataSource) -> bool: stmt = select(DataSourceModel).where(DataSourceModel.id == data_source.id.value) diff --git a/src/api/management/infrastructure/repositories/knowledge_graph_repository.py b/src/api/management/infrastructure/repositories/knowledge_graph_repository.py index f7b0b8c4..1d4bf059 100644 --- a/src/api/management/infrastructure/repositories/knowledge_graph_repository.py +++ b/src/api/management/infrastructure/repositories/knowledge_graph_repository.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING -from sqlalchemy import select +from sqlalchemy import func, select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession @@ -121,16 +121,32 @@ async def get_by_id( self._probe.knowledge_graph_retrieved(knowledge_graph_id.value) return self._to_domain(model) - async def find_by_tenant(self, tenant_id: str) -> list[KnowledgeGraph]: - stmt = select(KnowledgeGraphModel).where( - KnowledgeGraphModel.tenant_id == tenant_id + async def find_by_tenant( + self, tenant_id: str, *, offset: int = 0, limit: int = 20 + ) -> tuple[list[KnowledgeGraph], int]: + # Count query + count_stmt = ( + select(func.count()) + .select_from(KnowledgeGraphModel) + .where(KnowledgeGraphModel.tenant_id == tenant_id) + ) + count_result = await self._session.execute(count_stmt) + total = count_result.scalar_one() + + # Paginated query + stmt = ( + select(KnowledgeGraphModel) + .where(KnowledgeGraphModel.tenant_id == tenant_id) + .offset(offset) + .limit(limit) + .order_by(KnowledgeGraphModel.created_at.desc()) ) result = await self._session.execute(stmt) models = result.scalars().all() kgs = [self._to_domain(model) for model in models] self._probe.knowledge_graphs_listed(tenant_id, len(kgs)) - return kgs + return kgs, total async def delete(self, knowledge_graph: KnowledgeGraph) -> bool: stmt = select(KnowledgeGraphModel).where( diff --git a/src/api/management/ports/repositories.py b/src/api/management/ports/repositories.py index 3a634699..c6d4c69a 100644 --- a/src/api/management/ports/repositories.py +++ b/src/api/management/ports/repositories.py @@ -51,14 +51,18 @@ async def get_by_id( """ ... - async def find_by_tenant(self, tenant_id: str) -> list[KnowledgeGraph]: - """List all knowledge graphs belonging to a tenant. + async def find_by_tenant( + self, tenant_id: str, *, offset: int = 0, limit: int = 20 + ) -> tuple[list[KnowledgeGraph], int]: + """List knowledge graphs with pagination. Args: tenant_id: The tenant to list knowledge graphs for + offset: Number of records to skip + limit: Maximum number of records to return Returns: - List of KnowledgeGraph aggregates in the tenant + Tuple of (items for the requested page, total count) """ ... @@ -111,15 +115,17 @@ async def get_by_id(self, data_source_id: DataSourceId) -> DataSource | None: ... async def find_by_knowledge_graph( - self, knowledge_graph_id: str - ) -> list[DataSource]: - """List all data sources belonging to a knowledge graph. + self, knowledge_graph_id: str, *, offset: int = 0, limit: int = 20 + ) -> tuple[list[DataSource], int]: + """List data sources with pagination. Args: knowledge_graph_id: The knowledge graph to list data sources for + offset: Number of records to skip + limit: Maximum number of records to return Returns: - List of DataSource aggregates for the knowledge graph + Tuple of (items for the requested page, total count) """ ... diff --git a/src/api/management/presentation/data_sources/routes.py b/src/api/management/presentation/data_sources/routes.py index f996dd22..e19988aa 100644 --- a/src/api/management/presentation/data_sources/routes.py +++ b/src/api/management/presentation/data_sources/routes.py @@ -72,6 +72,13 @@ async def create_data_source( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) @router.get( @@ -88,14 +95,14 @@ async def list_data_sources( ) -> DataSourceListResponse: """List data sources for a knowledge graph with pagination.""" try: - all_ds = await service.list_for_knowledge_graph( + data_sources, total = await service.list_for_knowledge_graph( user_id=current_user.user_id.value, kg_id=kg_id, + offset=offset, + limit=limit, ) - total = len(all_ds) - paginated = all_ds[offset : offset + limit] return DataSourceListResponse( - items=[DataSourceResponse.from_domain(ds) for ds in paginated], + items=[DataSourceResponse.from_domain(ds) for ds in data_sources], total=total, offset=offset, limit=limit, @@ -105,6 +112,13 @@ async def list_data_sources( status_code=status.HTTP_403_FORBIDDEN, detail="You do not have permission to perform this action", ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) @router.get( @@ -131,6 +145,13 @@ async def get_data_source( status_code=status.HTTP_403_FORBIDDEN, detail="You do not have permission to perform this action", ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) @router.patch( @@ -175,6 +196,13 @@ async def update_data_source( status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg, ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) @router.delete( @@ -209,7 +237,7 @@ async def delete_data_source( except Exception: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to delete data source", + detail="An unexpected error occurred", ) @@ -241,3 +269,10 @@ async def trigger_sync( status_code=status.HTTP_404_NOT_FOUND, detail="Data source not found", ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) diff --git a/src/api/management/presentation/knowledge_graphs/routes.py b/src/api/management/presentation/knowledge_graphs/routes.py index 554ab726..635a0bb8 100644 --- a/src/api/management/presentation/knowledge_graphs/routes.py +++ b/src/api/management/presentation/knowledge_graphs/routes.py @@ -63,6 +63,13 @@ async def create_knowledge_graph( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) @router.get( @@ -79,14 +86,14 @@ async def list_knowledge_graphs( ) -> KnowledgeGraphListResponse: """List knowledge graphs in a workspace with pagination.""" try: - all_kgs = await service.list_for_workspace( + kgs, total = await service.list_for_workspace( user_id=current_user.user_id.value, workspace_id=workspace_id, + offset=offset, + limit=limit, ) - total = len(all_kgs) - paginated = all_kgs[offset : offset + limit] return KnowledgeGraphListResponse( - items=[KnowledgeGraphResponse.from_domain(kg) for kg in paginated], + items=[KnowledgeGraphResponse.from_domain(kg) for kg in kgs], total=total, offset=offset, limit=limit, @@ -96,6 +103,13 @@ async def list_knowledge_graphs( status_code=status.HTTP_403_FORBIDDEN, detail="You do not have permission to perform this action", ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) @router.get( @@ -122,6 +136,13 @@ async def get_knowledge_graph( status_code=status.HTTP_403_FORBIDDEN, detail="You do not have permission to perform this action", ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) @router.patch( @@ -165,6 +186,13 @@ async def update_knowledge_graph( status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg, ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) @router.delete( @@ -199,5 +227,5 @@ async def delete_knowledge_graph( except Exception: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to delete knowledge graph", + detail="An unexpected error occurred", ) diff --git a/src/api/tests/integration/management/test_data_source_repository.py b/src/api/tests/integration/management/test_data_source_repository.py index 9f395c69..01c592d4 100644 --- a/src/api/tests/integration/management/test_data_source_repository.py +++ b/src/api/tests/integration/management/test_data_source_repository.py @@ -300,8 +300,11 @@ async def test_finds_data_sources_by_knowledge_graph( await data_source_repository.save(ds2) await data_source_repository.save(ds3) - results = await data_source_repository.find_by_knowledge_graph(kg1.id.value) + results, total = await data_source_repository.find_by_knowledge_graph( + kg1.id.value + ) + assert total == 2 assert len(results) == 2 result_ids = {r.id.value for r in results} assert ds1.id.value in result_ids @@ -314,9 +317,12 @@ async def test_returns_empty_for_kg_with_no_sources( clean_management_data, ): """Should return an empty list when KG has no data sources.""" - results = await data_source_repository.find_by_knowledge_graph("nonexistent") + results, total = await data_source_repository.find_by_knowledge_graph( + "nonexistent" + ) assert results == [] + assert total == 0 class TestDataSourceUniqueness: diff --git a/src/api/tests/integration/management/test_knowledge_graph_repository.py b/src/api/tests/integration/management/test_knowledge_graph_repository.py index 0d81b10b..826a55ab 100644 --- a/src/api/tests/integration/management/test_knowledge_graph_repository.py +++ b/src/api/tests/integration/management/test_knowledge_graph_repository.py @@ -374,8 +374,9 @@ async def test_finds_knowledge_graphs_by_tenant( await knowledge_graph_repository.save(kg_other) # Query for the test tenant - results = await knowledge_graph_repository.find_by_tenant(test_tenant) + results, total = await knowledge_graph_repository.find_by_tenant(test_tenant) + assert total == 2 assert len(results) == 2 result_ids = {r.id.value for r in results} assert kg1.id.value in result_ids @@ -388,9 +389,12 @@ async def test_returns_empty_list_for_tenant_with_no_graphs( clean_management_data, ): """Should return an empty list when tenant has no knowledge graphs.""" - results = await knowledge_graph_repository.find_by_tenant("nonexistent-tenant") + results, total = await knowledge_graph_repository.find_by_tenant( + "nonexistent-tenant" + ) assert results == [] + assert total == 0 class TestOutboxConsistency: diff --git a/src/api/tests/unit/management/application/test_data_source_service.py b/src/api/tests/unit/management/application/test_data_source_service.py index e1d044f3..2676f15f 100644 --- a/src/api/tests/unit/management/application/test_data_source_service.py +++ b/src/api/tests/unit/management/application/test_data_source_service.py @@ -375,7 +375,7 @@ async def test_list_checks_view_permission_on_kg( """list_for_knowledge_graph() checks VIEW on the KG.""" mock_authz.check_permission.return_value = True mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) - mock_ds_repo.find_by_knowledge_graph.return_value = [] + mock_ds_repo.find_by_knowledge_graph.return_value = ([], 0) await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) @@ -436,11 +436,14 @@ async def test_list_returns_data_sources( mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) ds1 = _make_ds(ds_id="ds-001") ds2 = _make_ds(ds_id="ds-002") - mock_ds_repo.find_by_knowledge_graph.return_value = [ds1, ds2] + mock_ds_repo.find_by_knowledge_graph.return_value = ([ds1, ds2], 2) - result = await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + result, total = await service.list_for_knowledge_graph( + user_id=user_id, kg_id=kg_id + ) assert len(result) == 2 + assert total == 2 mock_probe.data_sources_listed.assert_called_once_with( kg_id=kg_id, count=2, diff --git a/src/api/tests/unit/management/application/test_knowledge_graph_service.py b/src/api/tests/unit/management/application/test_knowledge_graph_service.py index 44e0923f..8bd5f610 100644 --- a/src/api/tests/unit/management/application/test_knowledge_graph_service.py +++ b/src/api/tests/unit/management/application/test_knowledge_graph_service.py @@ -362,11 +362,12 @@ async def test_list_uses_read_relationships_to_discover_kgs( ] mock_kg_repo.get_by_id.side_effect = [kg1, kg2] - result = await service.list_for_workspace( + result, total = await service.list_for_workspace( user_id=user_id, workspace_id=workspace_id ) assert len(result) == 2 + assert total == 2 mock_probe.knowledge_graphs_listed.assert_called_once_with( workspace_id=workspace_id, count=2, @@ -394,11 +395,12 @@ async def test_list_filters_by_tenant( ] mock_kg_repo.get_by_id.side_effect = [kg_own, kg_other] - result = await service.list_for_workspace( + result, total = await service.list_for_workspace( user_id=user_id, workspace_id=workspace_id ) assert len(result) == 1 + assert total == 1 assert result[0].id.value == "kg-001" @@ -537,7 +539,7 @@ async def test_delete_checks_manage_permission_on_kg( kg = _make_kg() mock_authz.check_permission.return_value = True mock_kg_repo.get_by_id.return_value = kg - mock_ds_repo.find_by_knowledge_graph.return_value = [] + mock_ds_repo.find_by_knowledge_graph.return_value = ([], 0) mock_kg_repo.delete.return_value = True await service.delete(user_id=user_id, kg_id=kg.id.value) @@ -593,7 +595,7 @@ async def test_delete_cascades_data_sources( ds2 = MagicMock() mock_authz.check_permission.return_value = True mock_kg_repo.get_by_id.return_value = kg - mock_ds_repo.find_by_knowledge_graph.return_value = [ds1, ds2] + mock_ds_repo.find_by_knowledge_graph.return_value = ([ds1, ds2], 2) mock_ds_repo.delete.return_value = True mock_kg_repo.delete.return_value = True @@ -614,7 +616,7 @@ async def test_delete_probes_success( kg = _make_kg() mock_authz.check_permission.return_value = True mock_kg_repo.get_by_id.return_value = kg - mock_ds_repo.find_by_knowledge_graph.return_value = [] + mock_ds_repo.find_by_knowledge_graph.return_value = ([], 0) mock_kg_repo.delete.return_value = True await service.delete(user_id=user_id, kg_id=kg.id.value) diff --git a/src/api/tests/unit/management/presentation/test_data_source_routes.py b/src/api/tests/unit/management/presentation/test_data_source_routes.py index 1234a0ce..583f5258 100644 --- a/src/api/tests/unit/management/presentation/test_data_source_routes.py +++ b/src/api/tests/unit/management/presentation/test_data_source_routes.py @@ -233,7 +233,7 @@ async def test_lists_successfully(self, client, mock_service): data_sources = [ _make_ds(ds_id=f"01JTEST00000000000000DS00{i}") for i in range(3) ] - mock_service.list_for_knowledge_graph.return_value = data_sources + mock_service.list_for_knowledge_graph.return_value = (data_sources, 3) resp = await client.get( f"/management/knowledge-graphs/{KG_ID}/data-sources", @@ -249,10 +249,11 @@ async def test_lists_successfully(self, client, mock_service): @pytest.mark.asyncio async def test_pagination_offset_limit(self, client, mock_service): """Test that offset and limit query params work correctly.""" + # Service returns only the paginated slice; total reflects full count data_sources = [ - _make_ds(ds_id=f"01JTEST00000000000000DS00{i}") for i in range(5) + _make_ds(ds_id=f"01JTEST00000000000000DS00{i}") for i in range(2) ] - mock_service.list_for_knowledge_graph.return_value = data_sources + mock_service.list_for_knowledge_graph.return_value = (data_sources, 5) resp = await client.get( f"/management/knowledge-graphs/{KG_ID}/data-sources?offset=2&limit=2", diff --git a/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py b/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py index cfb6b13b..726e522d 100644 --- a/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py +++ b/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py @@ -182,7 +182,7 @@ class TestListKnowledgeGraphs: async def test_lists_successfully(self, client, mock_service): """Test successful list returns 200 with correct pagination.""" kgs = [_make_kg(kg_id=f"01JTEST00000000000000KG00{i}") for i in range(3)] - mock_service.list_for_workspace.return_value = kgs + mock_service.list_for_workspace.return_value = (kgs, 3) resp = await client.get( f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", @@ -198,8 +198,9 @@ async def test_lists_successfully(self, client, mock_service): @pytest.mark.asyncio async def test_pagination_offset_limit(self, client, mock_service): """Test that offset and limit query params work correctly.""" - kgs = [_make_kg(kg_id=f"01JTEST00000000000000KG00{i}") for i in range(5)] - mock_service.list_for_workspace.return_value = kgs + # Service returns only the paginated slice; total reflects full count + kgs = [_make_kg(kg_id=f"01JTEST00000000000000KG00{i}") for i in range(2)] + mock_service.list_for_workspace.return_value = (kgs, 5) resp = await client.get( f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs?offset=1&limit=2", @@ -226,7 +227,7 @@ async def test_unauthorized_returns_403(self, client, mock_service): @pytest.mark.asyncio async def test_empty_list(self, client, mock_service): """Test listing returns empty result correctly.""" - mock_service.list_for_workspace.return_value = [] + mock_service.list_for_workspace.return_value = ([], 0) resp = await client.get( f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", From 634108b51900515d9df55dd279b0ed51dc85a8be Mon Sep 17 00:00:00 2001 From: John Sell Date: Wed, 18 Mar 2026 10:52:59 -0400 Subject: [PATCH 7/9] fix(management): fix integration tests for pagination return type and transaction handling - Update repository integration tests to wrap all raw SQL operations in async with session.begin() blocks instead of bare execute+commit - Fix transaction-already-begun errors by ensuring every database operation uses explicit transaction boundaries - Fix conftest clean_management_data fixture to use begin() context manager Co-Authored-By: Claude Opus 4.6 --- .../tests/integration/management/conftest.py | 22 +-- .../management/test_data_source_repository.py | 34 ++--- .../test_knowledge_graph_repository.py | 125 +++++++++--------- 3 files changed, 94 insertions(+), 87 deletions(-) diff --git a/src/api/tests/integration/management/conftest.py b/src/api/tests/integration/management/conftest.py index 7dbba4d6..e3f60a8a 100644 --- a/src/api/tests/integration/management/conftest.py +++ b/src/api/tests/integration/management/conftest.py @@ -99,17 +99,19 @@ async def clean_management_data( async def cleanup() -> None: try: - await mgmt_async_session.execute(text("DELETE FROM data_source_sync_runs")) - await mgmt_async_session.execute(text("DELETE FROM data_sources")) - await mgmt_async_session.execute(text("DELETE FROM knowledge_graphs")) - # Clean outbox entries related to management - await mgmt_async_session.execute( - text( - "DELETE FROM outbox WHERE aggregate_type IN " - "('KnowledgeGraph', 'DataSource')" + async with mgmt_async_session.begin(): + await mgmt_async_session.execute( + text("DELETE FROM data_source_sync_runs") + ) + await mgmt_async_session.execute(text("DELETE FROM data_sources")) + await mgmt_async_session.execute(text("DELETE FROM knowledge_graphs")) + # Clean outbox entries related to management + await mgmt_async_session.execute( + text( + "DELETE FROM outbox WHERE aggregate_type IN " + "('KnowledgeGraph', 'DataSource')" + ) ) - ) - await mgmt_async_session.commit() except Exception: await mgmt_async_session.rollback() raise diff --git a/src/api/tests/integration/management/test_data_source_repository.py b/src/api/tests/integration/management/test_data_source_repository.py index 01c592d4..a9c22c41 100644 --- a/src/api/tests/integration/management/test_data_source_repository.py +++ b/src/api/tests/integration/management/test_data_source_repository.py @@ -396,8 +396,8 @@ async def test_save_records_outbox_event( await knowledge_graph_repository.save(kg) # Clear outbox of the KG create event - await async_session.execute(text("DELETE FROM outbox")) - await async_session.commit() + async with async_session.begin(): + await async_session.execute(text("DELETE FROM outbox")) ds = DataSource.create( knowledge_graph_id=kg.id.value, @@ -410,13 +410,14 @@ async def test_save_records_outbox_event( async with async_session.begin(): await data_source_repository.save(ds) - result = await async_session.execute( - text( - "SELECT aggregate_type, event_type, aggregate_id " - "FROM outbox WHERE aggregate_type = 'data_source'" + async with async_session.begin(): + result = await async_session.execute( + text( + "SELECT aggregate_type, event_type, aggregate_id " + "FROM outbox WHERE aggregate_type = 'data_source'" + ) ) - ) - rows = result.fetchall() + rows = result.fetchall() assert len(rows) == 1 assert rows[0].aggregate_type == "data_source" @@ -455,21 +456,22 @@ async def test_delete_records_outbox_event( await data_source_repository.save(ds) # Clear outbox of the create events so we isolate the delete event - await async_session.execute(text("DELETE FROM outbox")) - await async_session.commit() + async with async_session.begin(): + await async_session.execute(text("DELETE FROM outbox")) ds.mark_for_deletion() async with async_session.begin(): await data_source_repository.delete(ds) - result = await async_session.execute( - text( - "SELECT aggregate_type, event_type, aggregate_id " - "FROM outbox WHERE aggregate_type = 'data_source'" + async with async_session.begin(): + result = await async_session.execute( + text( + "SELECT aggregate_type, event_type, aggregate_id " + "FROM outbox WHERE aggregate_type = 'data_source'" + ) ) - ) - rows = result.fetchall() + rows = result.fetchall() assert len(rows) == 1 assert rows[0].aggregate_type == "data_source" diff --git a/src/api/tests/integration/management/test_knowledge_graph_repository.py b/src/api/tests/integration/management/test_knowledge_graph_repository.py index 826a55ab..e44b88bb 100644 --- a/src/api/tests/integration/management/test_knowledge_graph_repository.py +++ b/src/api/tests/integration/management/test_knowledge_graph_repository.py @@ -130,21 +130,22 @@ async def test_update_records_outbox_event( await knowledge_graph_repository.save(kg) # Clear outbox of the create event so we can isolate the update event - await async_session.execute(text("DELETE FROM outbox")) - await async_session.commit() + async with async_session.begin(): + await async_session.execute(text("DELETE FROM outbox")) kg.update(name="After Update", description="After") async with async_session.begin(): await knowledge_graph_repository.save(kg) - result = await async_session.execute( - text( - "SELECT aggregate_type, event_type, aggregate_id " - "FROM outbox WHERE aggregate_type = 'knowledge_graph'" + async with async_session.begin(): + result = await async_session.execute( + text( + "SELECT aggregate_type, event_type, aggregate_id " + "FROM outbox WHERE aggregate_type = 'knowledge_graph'" + ) ) - ) - rows = result.fetchall() + rows = result.fetchall() assert len(rows) == 1 assert rows[0].aggregate_type == "knowledge_graph" @@ -245,23 +246,23 @@ async def test_delete_kg_with_data_sources_raises_integrity_error( from ulid import ULID ds_id = str(ULID()) - await async_session.execute( - text( - "INSERT INTO data_sources " - "(id, knowledge_graph_id, tenant_id, name, adapter_type, " - "connection_config, schedule_type, created_at, updated_at) " - "VALUES (:id, :kg_id, :tid, :name, :adapter, " - "'{}'::jsonb, 'MANUAL', NOW(), NOW())" - ), - { - "id": ds_id, - "kg_id": kg.id.value, - "tid": test_tenant, - "name": "child-ds", - "adapter": DataSourceAdapterType.GITHUB.value, - }, - ) - await async_session.commit() + async with async_session.begin(): + await async_session.execute( + text( + "INSERT INTO data_sources " + "(id, knowledge_graph_id, tenant_id, name, adapter_type, " + "connection_config, schedule_type, created_at, updated_at) " + "VALUES (:id, :kg_id, :tid, :name, :adapter, " + "'{}'::jsonb, 'MANUAL', NOW(), NOW())" + ), + { + "id": ds_id, + "kg_id": kg.id.value, + "tid": test_tenant, + "name": "child-ds", + "adapter": DataSourceAdapterType.GITHUB.value, + }, + ) kg.mark_for_deletion() @@ -341,26 +342,26 @@ async def test_finds_knowledge_graphs_by_tenant( other_tenant_id = str(ULID()) other_workspace_id = str(ULID()) - await async_session.execute( - text( - "INSERT INTO tenants (id, name, created_at, updated_at) " - "VALUES (:id, :name, NOW(), NOW())" - ), - {"id": other_tenant_id, "name": f"other-tenant-{other_tenant_id}"}, - ) - await async_session.execute( - text( - "INSERT INTO workspaces (id, tenant_id, name, is_root, created_at, updated_at) " - "VALUES (:id, :tenant_id, :name, :is_root, NOW(), NOW())" - ), - { - "id": other_workspace_id, - "tenant_id": other_tenant_id, - "name": f"other-workspace-{other_workspace_id}", - "is_root": True, - }, - ) - await async_session.commit() + async with async_session.begin(): + await async_session.execute( + text( + "INSERT INTO tenants (id, name, created_at, updated_at) " + "VALUES (:id, :name, NOW(), NOW())" + ), + {"id": other_tenant_id, "name": f"other-tenant-{other_tenant_id}"}, + ) + await async_session.execute( + text( + "INSERT INTO workspaces (id, tenant_id, name, is_root, created_at, updated_at) " + "VALUES (:id, :tenant_id, :name, :is_root, NOW(), NOW())" + ), + { + "id": other_workspace_id, + "tenant_id": other_tenant_id, + "name": f"other-workspace-{other_workspace_id}", + "is_root": True, + }, + ) # Create 1 KG in the other tenant kg_other = KnowledgeGraph.create( @@ -411,8 +412,8 @@ async def test_save_records_outbox_event( ): """Should record a KnowledgeGraphCreated event in the outbox table.""" # Clear any pre-existing outbox entries - await async_session.execute(text("DELETE FROM outbox")) - await async_session.commit() + async with async_session.begin(): + await async_session.execute(text("DELETE FROM outbox")) kg = KnowledgeGraph.create( tenant_id=test_tenant, @@ -424,13 +425,14 @@ async def test_save_records_outbox_event( async with async_session.begin(): await knowledge_graph_repository.save(kg) - result = await async_session.execute( - text( - "SELECT aggregate_type, event_type, aggregate_id " - "FROM outbox WHERE aggregate_type = 'knowledge_graph'" + async with async_session.begin(): + result = await async_session.execute( + text( + "SELECT aggregate_type, event_type, aggregate_id " + "FROM outbox WHERE aggregate_type = 'knowledge_graph'" + ) ) - ) - rows = result.fetchall() + rows = result.fetchall() assert len(rows) == 1 assert rows[0].aggregate_type == "knowledge_graph" @@ -458,8 +460,8 @@ async def test_delete_records_outbox_event( await knowledge_graph_repository.save(kg) # Clear outbox of the create event so we can isolate the delete event - await async_session.execute(text("DELETE FROM outbox")) - await async_session.commit() + async with async_session.begin(): + await async_session.execute(text("DELETE FROM outbox")) async with async_session.begin(): retrieved = await knowledge_graph_repository.get_by_id(kg.id) @@ -468,13 +470,14 @@ async def test_delete_records_outbox_event( retrieved.mark_for_deletion() await knowledge_graph_repository.delete(retrieved) - result = await async_session.execute( - text( - "SELECT aggregate_type, event_type, aggregate_id " - "FROM outbox WHERE aggregate_type = 'knowledge_graph'" + async with async_session.begin(): + result = await async_session.execute( + text( + "SELECT aggregate_type, event_type, aggregate_id " + "FROM outbox WHERE aggregate_type = 'knowledge_graph'" + ) ) - ) - rows = result.fetchall() + rows = result.fetchall() assert len(rows) == 1 assert rows[0].aggregate_type == "knowledge_graph" From 5b7d0a662cf25fa57a5b8a6f81a146c53f7eab36 Mon Sep 17 00:00:00 2001 From: John Sell Date: Wed, 18 Mar 2026 10:59:47 -0400 Subject: [PATCH 8/9] fix(management): use aggregate method for credential updates and add error to SyncRunResponse - Route credential-only updates through ds.update_connection() instead of directly mutating ds.credentials_path, ensuring DataSourceUpdated event emission and updated_at timestamp update - Add error field to SyncRunResponse so failed syncs expose their error message Co-Authored-By: Claude Opus 4.6 --- .../application/services/data_source_service.py | 8 +++++++- src/api/management/presentation/data_sources/models.py | 2 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/api/management/application/services/data_source_service.py b/src/api/management/application/services/data_source_service.py index 8560e6c6..63dbaf79 100644 --- a/src/api/management/application/services/data_source_service.py +++ b/src/api/management/application/services/data_source_service.py @@ -348,7 +348,13 @@ async def update( tenant_id=self._scope_to_tenant, credentials=raw_credentials, ) - ds.credentials_path = cred_path + # Update via aggregate method to emit event and update timestamps + ds.update_connection( + name=ds.name, + connection_config=ds.connection_config, + credentials_path=cred_path, + updated_by=user_id, + ) await self._ds_repo.save(ds) except IntegrityError as e: diff --git a/src/api/management/presentation/data_sources/models.py b/src/api/management/presentation/data_sources/models.py index 4d4c32dc..aeb6f258 100644 --- a/src/api/management/presentation/data_sources/models.py +++ b/src/api/management/presentation/data_sources/models.py @@ -133,6 +133,7 @@ class SyncRunResponse(BaseModel): status: str started_at: datetime completed_at: datetime | None + error: str | None created_at: datetime @classmethod @@ -151,5 +152,6 @@ def from_domain(cls, sync_run: DataSourceSyncRun) -> SyncRunResponse: status=sync_run.status, started_at=sync_run.started_at, completed_at=sync_run.completed_at, + error=sync_run.error, created_at=sync_run.created_at, ) From 800e74939d9e76201176683e8a30bdb7b8c4b54e Mon Sep 17 00:00:00 2001 From: John Sell Date: Wed, 18 Mar 2026 13:21:04 -0400 Subject: [PATCH 9/9] fix(management): fix cascade delete loop and outbox cleanup case mismatch - Replace limit=10000 magic number with batched while-loop (100 per batch) for cascade deleting data sources when a knowledge graph is deleted - Fix outbox cleanup SQL to use snake_case aggregate types matching what repositories actually write ('knowledge_graph', 'data_source') Co-Authored-By: Claude Opus 4.6 --- .../services/knowledge_graph_service.py | 17 ++++++++++------- .../tests/integration/management/conftest.py | 2 +- src/api/uv.lock | 2 +- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/api/management/application/services/knowledge_graph_service.py b/src/api/management/application/services/knowledge_graph_service.py index d037eeca..ca578910 100644 --- a/src/api/management/application/services/knowledge_graph_service.py +++ b/src/api/management/application/services/knowledge_graph_service.py @@ -385,14 +385,17 @@ async def delete( return False async with self._session.begin(): - # Cascade delete data sources if repo is available + # Cascade delete all data sources before deleting KG if self._ds_repo is not None: - data_sources, _ = await self._ds_repo.find_by_knowledge_graph( - kg_id, offset=0, limit=10000 - ) - for ds in data_sources: - ds.mark_for_deletion(deleted_by=user_id) - await self._ds_repo.delete(ds) + while True: + batch, _ = await self._ds_repo.find_by_knowledge_graph( + kg_id, offset=0, limit=100 + ) + if not batch: + break + for ds in batch: + ds.mark_for_deletion(deleted_by=user_id) + await self._ds_repo.delete(ds) kg.mark_for_deletion(deleted_by=user_id) await self._kg_repo.delete(kg) diff --git a/src/api/tests/integration/management/conftest.py b/src/api/tests/integration/management/conftest.py index e3f60a8a..73ddf1a4 100644 --- a/src/api/tests/integration/management/conftest.py +++ b/src/api/tests/integration/management/conftest.py @@ -109,7 +109,7 @@ async def cleanup() -> None: await mgmt_async_session.execute( text( "DELETE FROM outbox WHERE aggregate_type IN " - "('KnowledgeGraph', 'DataSource')" + "('knowledge_graph', 'data_source')" ) ) except Exception: diff --git a/src/api/uv.lock b/src/api/uv.lock index b085bb5d..a6fec3b0 100644 --- a/src/api/uv.lock +++ b/src/api/uv.lock @@ -1171,7 +1171,7 @@ wheels = [ [[package]] name = "kartograph-api" -version = "3.30.0" +version = "3.31.0" source = { virtual = "." } dependencies = [ { name = "alembic" },