diff --git a/src/api/main.py b/src/api/main.py index 50153a8a..2404cd47 100644 --- a/src/api/main.py +++ b/src/api/main.py @@ -11,6 +11,7 @@ from graph.infrastructure.age_client import AgeGraphClient from graph.presentation import routes as graph_routes from iam.presentation import router as iam_router +from management.presentation import management_router from infrastructure.database.dependencies import ( close_database_engines, init_database_engines, @@ -216,6 +217,9 @@ async def kartograph_lifespan(app: FastAPI): # Include IAM bounded context routes app.include_router(iam_router) +# Include Management bounded context routes +app.include_router(management_router) + # Include dev utility routes (easy to remove for production) app.include_router(dev_routes.router) diff --git a/src/api/management/application/services/data_source_service.py b/src/api/management/application/services/data_source_service.py index 0533c1fc..63dbaf79 100644 --- a/src/api/management/application/services/data_source_service.py +++ b/src/api/management/application/services/data_source_service.py @@ -8,6 +8,7 @@ from datetime import UTC, datetime +from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from ulid import ULID @@ -18,7 +19,7 @@ from management.domain.aggregates import DataSource from management.domain.entities import DataSourceSyncRun from management.domain.value_objects import DataSourceId, KnowledgeGraphId -from management.ports.exceptions import UnauthorizedError +from management.ports.exceptions import DuplicateDataSourceNameError, UnauthorizedError from management.ports.repositories import ( IDataSourceRepository, IDataSourceSyncRunRepository, @@ -150,26 +151,36 @@ async def create( if kg.tenant_id != self._scope_to_tenant: raise ValueError(f"Knowledge graph {kg_id} belongs to different tenant") - async with self._session.begin(): - ds = DataSource.create( - knowledge_graph_id=kg_id, - tenant_id=self._scope_to_tenant, - name=name, - adapter_type=adapter_type, - connection_config=connection_config, - created_by=user_id, - ) - - if raw_credentials is not None: - cred_path = f"datasource/{ds.id.value}/credentials" - await self._secret_store.store( - path=cred_path, + try: + async with self._session.begin(): + ds = DataSource.create( + knowledge_graph_id=kg_id, tenant_id=self._scope_to_tenant, - credentials=raw_credentials, + name=name, + adapter_type=adapter_type, + connection_config=connection_config, + created_by=user_id, ) - ds.credentials_path = cred_path - await self._ds_repo.save(ds) + if raw_credentials is not None: + cred_path = f"datasource/{ds.id.value}/credentials" + await self._secret_store.store( + path=cred_path, + tenant_id=self._scope_to_tenant, + credentials=raw_credentials, + ) + ds.credentials_path = cred_path + + await self._ds_repo.save(ds) + except IntegrityError as e: + if "uq_data_sources_kg_name" in str(e): + self._probe.data_source_creation_failed( + kg_id=kg_id, name=name, error="duplicate name" + ) + raise DuplicateDataSourceNameError( + f"Data source '{name}' already exists in knowledge graph '{kg_id}'" + ) from e + raise self._probe.data_source_created( ds_id=ds.id.value, @@ -219,15 +230,20 @@ async def list_for_knowledge_graph( self, user_id: str, kg_id: str, - ) -> list[DataSource]: - """List data sources for a knowledge graph. + *, + offset: int = 0, + limit: int = 20, + ) -> tuple[list[DataSource], int]: + """List data sources for a knowledge graph with pagination. Args: user_id: The user requesting the list kg_id: The knowledge graph to list DSes for + offset: Number of records to skip + limit: Maximum number of records to return Returns: - List of DataSource aggregates + Tuple of (paginated DataSource aggregates, total count) Raises: UnauthorizedError: If user lacks VIEW permission on KG @@ -254,14 +270,16 @@ async def list_for_knowledge_graph( if kg is None or kg.tenant_id != self._scope_to_tenant: raise UnauthorizedError(f"Knowledge graph {kg_id} not accessible") - data_sources = await self._ds_repo.find_by_knowledge_graph(kg_id) + data_sources, total = await self._ds_repo.find_by_knowledge_graph( + kg_id, offset=offset, limit=limit + ) self._probe.data_sources_listed( kg_id=kg_id, count=len(data_sources), ) - return data_sources + return data_sources, total async def update( self, @@ -311,27 +329,45 @@ async def update( if ds.tenant_id != self._scope_to_tenant: raise ValueError(f"Data source {ds_id} not found") - async with self._session.begin(): - if name is not None or connection_config is not None: - ds.update_connection( - name=name if name is not None else ds.name, - connection_config=connection_config - if connection_config is not None - else ds.connection_config, - credentials_path=ds.credentials_path, - updated_by=user_id, - ) - - if raw_credentials is not None: - cred_path = f"datasource/{ds.id.value}/credentials" - await self._secret_store.store( - path=cred_path, - tenant_id=self._scope_to_tenant, - credentials=raw_credentials, + try: + async with self._session.begin(): + if name is not None or connection_config is not None: + ds.update_connection( + name=name if name is not None else ds.name, + connection_config=connection_config + if connection_config is not None + else ds.connection_config, + credentials_path=ds.credentials_path, + updated_by=user_id, + ) + + if raw_credentials is not None: + cred_path = f"datasource/{ds.id.value}/credentials" + await self._secret_store.store( + path=cred_path, + tenant_id=self._scope_to_tenant, + credentials=raw_credentials, + ) + # Update via aggregate method to emit event and update timestamps + ds.update_connection( + name=ds.name, + connection_config=ds.connection_config, + credentials_path=cred_path, + updated_by=user_id, + ) + + await self._ds_repo.save(ds) + except IntegrityError as e: + if "uq_data_sources_kg_name" in str(e): + self._probe.data_source_creation_failed( + kg_id=ds.knowledge_graph_id, + name=name or ds.name, + error="duplicate name", ) - ds.credentials_path = cred_path - - await self._ds_repo.save(ds) + raise DuplicateDataSourceNameError( + f"Data source '{name or ds.name}' already exists in knowledge graph '{ds.knowledge_graph_id}'" + ) from e + raise if name is not None: self._probe.data_source_updated(ds_id=ds_id, name=name) diff --git a/src/api/management/application/services/knowledge_graph_service.py b/src/api/management/application/services/knowledge_graph_service.py index ca187f04..ca578910 100644 --- a/src/api/management/application/services/knowledge_graph_service.py +++ b/src/api/management/application/services/knowledge_graph_service.py @@ -200,18 +200,24 @@ async def list_for_workspace( self, user_id: str, workspace_id: str, - ) -> list[KnowledgeGraph]: - """List knowledge graphs in a workspace. + *, + offset: int = 0, + limit: int = 20, + ) -> tuple[list[KnowledgeGraph], int]: + """List knowledge graphs in a workspace with pagination. Uses read_relationships to discover KG IDs linked to the workspace, then fetches each from the repository and filters by tenant. + Pagination is applied after filtering. Args: user_id: The user requesting the list workspace_id: The workspace to list KGs for + offset: Number of records to skip + limit: Maximum number of records to return Returns: - List of KnowledgeGraph aggregates + Tuple of (paginated KnowledgeGraph aggregates, total count) Raises: UnauthorizedError: If user lacks VIEW permission on workspace @@ -250,33 +256,37 @@ async def list_for_workspace( kg_ids.append(parts[1]) # Fetch each KG from repo and filter by tenant + # (N+1 problem - acceptable for walking skeleton) kgs: list[KnowledgeGraph] = [] for kg_id in kg_ids: kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) if kg is not None and kg.tenant_id == self._scope_to_tenant: kgs.append(kg) + total = len(kgs) + paginated = kgs[offset : offset + limit] + self._probe.knowledge_graphs_listed( workspace_id=workspace_id, - count=len(kgs), + count=len(paginated), ) - return kgs + return paginated, total async def update( self, user_id: str, kg_id: str, - name: str, - description: str, + name: str | None = None, + description: str | None = None, ) -> KnowledgeGraph: """Update a knowledge graph's metadata. Args: user_id: The user performing the update kg_id: The knowledge graph ID - name: New name - description: New description + name: Optional new name (uses existing if None) + description: Optional new description (uses existing if None) Returns: The updated KnowledgeGraph aggregate @@ -310,17 +320,26 @@ async def update( if kg.tenant_id != self._scope_to_tenant: raise ValueError(f"Knowledge graph {kg_id} not found") - kg.update(name=name, description=description, updated_by=user_id) + resolved_name = name if name is not None else kg.name + resolved_description = ( + description if description is not None else kg.description + ) + + kg.update( + name=resolved_name, + description=resolved_description, + updated_by=user_id, + ) try: async with self._session.begin(): await self._kg_repo.save(kg) except IntegrityError as e: raise DuplicateKnowledgeGraphNameError( - f"Knowledge graph '{name}' already exists in tenant" + f"Knowledge graph '{resolved_name}' already exists in tenant" ) from e - self._probe.knowledge_graph_updated(kg_id=kg_id, name=name) + self._probe.knowledge_graph_updated(kg_id=kg_id, name=resolved_name) return kg @@ -366,12 +385,17 @@ async def delete( return False async with self._session.begin(): - # Cascade delete data sources if repo is available + # Cascade delete all data sources before deleting KG if self._ds_repo is not None: - data_sources = await self._ds_repo.find_by_knowledge_graph(kg_id) - for ds in data_sources: - ds.mark_for_deletion(deleted_by=user_id) - await self._ds_repo.delete(ds) + while True: + batch, _ = await self._ds_repo.find_by_knowledge_graph( + kg_id, offset=0, limit=100 + ) + if not batch: + break + for ds in batch: + ds.mark_for_deletion(deleted_by=user_id) + await self._ds_repo.delete(ds) kg.mark_for_deletion(deleted_by=user_id) await self._kg_repo.delete(kg) diff --git a/src/api/management/infrastructure/repositories/data_source_repository.py b/src/api/management/infrastructure/repositories/data_source_repository.py index 7901a1f0..05aff7fc 100644 --- a/src/api/management/infrastructure/repositories/data_source_repository.py +++ b/src/api/management/infrastructure/repositories/data_source_repository.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING -from sqlalchemy import select +from sqlalchemy import func, select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession @@ -128,17 +128,31 @@ async def get_by_id(self, data_source_id: DataSourceId) -> DataSource | None: return self._to_domain(model) async def find_by_knowledge_graph( - self, knowledge_graph_id: str - ) -> list[DataSource]: - stmt = select(DataSourceModel).where( - DataSourceModel.knowledge_graph_id == knowledge_graph_id + self, knowledge_graph_id: str, *, offset: int = 0, limit: int = 20 + ) -> tuple[list[DataSource], int]: + # Count query + count_stmt = ( + select(func.count()) + .select_from(DataSourceModel) + .where(DataSourceModel.knowledge_graph_id == knowledge_graph_id) + ) + count_result = await self._session.execute(count_stmt) + total = count_result.scalar_one() + + # Paginated query + stmt = ( + select(DataSourceModel) + .where(DataSourceModel.knowledge_graph_id == knowledge_graph_id) + .offset(offset) + .limit(limit) + .order_by(DataSourceModel.created_at.desc()) ) result = await self._session.execute(stmt) models = result.scalars().all() data_sources = [self._to_domain(model) for model in models] self._probe.data_sources_listed(knowledge_graph_id, len(data_sources)) - return data_sources + return data_sources, total async def delete(self, data_source: DataSource) -> bool: stmt = select(DataSourceModel).where(DataSourceModel.id == data_source.id.value) diff --git a/src/api/management/infrastructure/repositories/knowledge_graph_repository.py b/src/api/management/infrastructure/repositories/knowledge_graph_repository.py index f7b0b8c4..1d4bf059 100644 --- a/src/api/management/infrastructure/repositories/knowledge_graph_repository.py +++ b/src/api/management/infrastructure/repositories/knowledge_graph_repository.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING -from sqlalchemy import select +from sqlalchemy import func, select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession @@ -121,16 +121,32 @@ async def get_by_id( self._probe.knowledge_graph_retrieved(knowledge_graph_id.value) return self._to_domain(model) - async def find_by_tenant(self, tenant_id: str) -> list[KnowledgeGraph]: - stmt = select(KnowledgeGraphModel).where( - KnowledgeGraphModel.tenant_id == tenant_id + async def find_by_tenant( + self, tenant_id: str, *, offset: int = 0, limit: int = 20 + ) -> tuple[list[KnowledgeGraph], int]: + # Count query + count_stmt = ( + select(func.count()) + .select_from(KnowledgeGraphModel) + .where(KnowledgeGraphModel.tenant_id == tenant_id) + ) + count_result = await self._session.execute(count_stmt) + total = count_result.scalar_one() + + # Paginated query + stmt = ( + select(KnowledgeGraphModel) + .where(KnowledgeGraphModel.tenant_id == tenant_id) + .offset(offset) + .limit(limit) + .order_by(KnowledgeGraphModel.created_at.desc()) ) result = await self._session.execute(stmt) models = result.scalars().all() kgs = [self._to_domain(model) for model in models] self._probe.knowledge_graphs_listed(tenant_id, len(kgs)) - return kgs + return kgs, total async def delete(self, knowledge_graph: KnowledgeGraph) -> bool: stmt = select(KnowledgeGraphModel).where( diff --git a/src/api/management/ports/repositories.py b/src/api/management/ports/repositories.py index 3a634699..c6d4c69a 100644 --- a/src/api/management/ports/repositories.py +++ b/src/api/management/ports/repositories.py @@ -51,14 +51,18 @@ async def get_by_id( """ ... - async def find_by_tenant(self, tenant_id: str) -> list[KnowledgeGraph]: - """List all knowledge graphs belonging to a tenant. + async def find_by_tenant( + self, tenant_id: str, *, offset: int = 0, limit: int = 20 + ) -> tuple[list[KnowledgeGraph], int]: + """List knowledge graphs with pagination. Args: tenant_id: The tenant to list knowledge graphs for + offset: Number of records to skip + limit: Maximum number of records to return Returns: - List of KnowledgeGraph aggregates in the tenant + Tuple of (items for the requested page, total count) """ ... @@ -111,15 +115,17 @@ async def get_by_id(self, data_source_id: DataSourceId) -> DataSource | None: ... async def find_by_knowledge_graph( - self, knowledge_graph_id: str - ) -> list[DataSource]: - """List all data sources belonging to a knowledge graph. + self, knowledge_graph_id: str, *, offset: int = 0, limit: int = 20 + ) -> tuple[list[DataSource], int]: + """List data sources with pagination. Args: knowledge_graph_id: The knowledge graph to list data sources for + offset: Number of records to skip + limit: Maximum number of records to return Returns: - List of DataSource aggregates for the knowledge graph + Tuple of (items for the requested page, total count) """ ... diff --git a/src/api/management/presentation/__init__.py b/src/api/management/presentation/__init__.py new file mode 100644 index 00000000..0135f72a --- /dev/null +++ b/src/api/management/presentation/__init__.py @@ -0,0 +1,22 @@ +"""Management presentation layer. + +Aggregates sub-routers for knowledge graphs and data sources, +exporting a single management_router for registration in main.py. +""" + +from __future__ import annotations + +from fastapi import APIRouter + +from management.presentation.data_sources.routes import router as ds_router +from management.presentation.knowledge_graphs.routes import router as kg_router + +management_router = APIRouter( + prefix="/management", + tags=["management"], +) + +management_router.include_router(kg_router) +management_router.include_router(ds_router) + +__all__ = ["management_router"] diff --git a/src/api/management/presentation/data_sources/__init__.py b/src/api/management/presentation/data_sources/__init__.py new file mode 100644 index 00000000..67042ba5 --- /dev/null +++ b/src/api/management/presentation/data_sources/__init__.py @@ -0,0 +1 @@ +"""Data source presentation sub-package.""" diff --git a/src/api/management/presentation/data_sources/models.py b/src/api/management/presentation/data_sources/models.py new file mode 100644 index 00000000..aeb6f258 --- /dev/null +++ b/src/api/management/presentation/data_sources/models.py @@ -0,0 +1,157 @@ +"""Request and response models for Data Source API endpoints.""" + +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, Field + +from management.domain.aggregates import DataSource +from management.domain.entities import DataSourceSyncRun + + +class CreateDataSourceRequest(BaseModel): + """Request to create a data source. + + Attributes: + name: Data source name (1-100 characters) + adapter_type: Adapter type string (validated against DataSourceAdapterType in route) + connection_config: Key-value connection configuration + credentials: Optional write-only credentials (never returned in responses) + """ + + name: str = Field(min_length=1, max_length=100) + adapter_type: str + connection_config: dict[str, str] + credentials: dict[str, str] | None = None + + +class UpdateDataSourceRequest(BaseModel): + """Request to partially update a data source. + + Attributes: + name: Optional new name (1-100 characters) + connection_config: Optional new connection configuration + credentials: Optional new credentials (write-only) + """ + + name: str | None = Field(default=None, min_length=1, max_length=100) + connection_config: dict[str, str] | None = None + credentials: dict[str, str] | None = None + + +class DataSourceResponse(BaseModel): + """Response containing data source details. + + Credentials are never returned. Instead, has_credentials indicates + whether credentials have been configured. + + Attributes: + id: Data source ID (ULID) + knowledge_graph_id: Parent knowledge graph ID + tenant_id: Tenant ID this data source belongs to + name: Data source name + adapter_type: Adapter type string + connection_config: Connection configuration key-value pairs + has_credentials: Whether credentials are configured + schedule_type: Schedule type (manual, cron, interval) + schedule_value: Schedule expression (None for manual) + last_sync_at: Last successful sync timestamp + created_at: Creation timestamp + updated_at: Last update timestamp + """ + + id: str + knowledge_graph_id: str + tenant_id: str + name: str + adapter_type: str + connection_config: dict[str, str] + has_credentials: bool + schedule_type: str + schedule_value: str | None + last_sync_at: datetime | None + created_at: datetime + updated_at: datetime + + @classmethod + def from_domain(cls, ds: DataSource) -> DataSourceResponse: + """Convert domain DataSource aggregate to API response. + + Args: + ds: DataSource domain aggregate + + Returns: + DataSourceResponse with data source details + """ + return cls( + id=ds.id.value, + knowledge_graph_id=ds.knowledge_graph_id, + tenant_id=ds.tenant_id, + name=ds.name, + adapter_type=ds.adapter_type.value, + connection_config=ds.connection_config, + has_credentials=ds.credentials_path is not None, + schedule_type=ds.schedule.schedule_type.value, + schedule_value=ds.schedule.value, + last_sync_at=ds.last_sync_at, + created_at=ds.created_at, + updated_at=ds.updated_at, + ) + + +class DataSourceListResponse(BaseModel): + """Response containing a paginated list of data sources. + + Attributes: + items: List of data source details + total: Total number of data sources (before pagination) + offset: Number of items skipped + limit: Maximum number of items returned + """ + + items: list[DataSourceResponse] + total: int + offset: int + limit: int + + +class SyncRunResponse(BaseModel): + """Response containing sync run details. + + Attributes: + id: Sync run ID + data_source_id: Data source this sync belongs to + status: Sync run status (pending, running, completed, failed) + started_at: Sync start timestamp + completed_at: Sync completion timestamp (None if not complete) + created_at: Record creation timestamp + """ + + id: str + data_source_id: str + status: str + started_at: datetime + completed_at: datetime | None + error: str | None + created_at: datetime + + @classmethod + def from_domain(cls, sync_run: DataSourceSyncRun) -> SyncRunResponse: + """Convert domain DataSourceSyncRun entity to API response. + + Args: + sync_run: DataSourceSyncRun domain entity + + Returns: + SyncRunResponse with sync run details + """ + return cls( + id=sync_run.id, + data_source_id=sync_run.data_source_id, + status=sync_run.status, + started_at=sync_run.started_at, + completed_at=sync_run.completed_at, + error=sync_run.error, + created_at=sync_run.created_at, + ) diff --git a/src/api/management/presentation/data_sources/routes.py b/src/api/management/presentation/data_sources/routes.py new file mode 100644 index 00000000..e19988aa --- /dev/null +++ b/src/api/management/presentation/data_sources/routes.py @@ -0,0 +1,278 @@ +"""Data source management routes.""" + +from __future__ import annotations + +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Query, status + +from iam.application.value_objects import CurrentUser +from iam.dependencies.user import get_current_user +from management.application.services.data_source_service import DataSourceService +from management.dependencies.data_source import get_data_source_service +from management.ports.exceptions import ( + DuplicateDataSourceNameError, + UnauthorizedError, +) +from management.presentation.data_sources.models import ( + CreateDataSourceRequest, + DataSourceListResponse, + DataSourceResponse, + SyncRunResponse, + UpdateDataSourceRequest, +) +from shared_kernel.datasource_types import DataSourceAdapterType + +router = APIRouter(tags=["Data Sources"]) + + +@router.post( + "/knowledge-graphs/{kg_id}/data-sources", + response_model=DataSourceResponse, + status_code=status.HTTP_201_CREATED, + summary="Create a data source", +) +async def create_data_source( + kg_id: str, + request: CreateDataSourceRequest, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[DataSourceService, Depends(get_data_source_service)], +) -> DataSourceResponse: + """Create a new data source in a knowledge graph.""" + try: + adapter_type = DataSourceAdapterType(request.adapter_type) + except ValueError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid adapter type: {request.adapter_type}", + ) + + try: + ds = await service.create( + user_id=current_user.user_id.value, + kg_id=kg_id, + name=request.name, + adapter_type=adapter_type, + connection_config=request.connection_config, + raw_credentials=request.credentials, + ) + return DataSourceResponse.from_domain(ds) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except DuplicateDataSourceNameError: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="A data source with this name already exists in this knowledge graph", + ) + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e), + ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) + + +@router.get( + "/knowledge-graphs/{kg_id}/data-sources", + response_model=DataSourceListResponse, + summary="List data sources for a knowledge graph", +) +async def list_data_sources( + kg_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[DataSourceService, Depends(get_data_source_service)], + offset: int = Query(default=0, ge=0), + limit: int = Query(default=20, ge=1, le=100), +) -> DataSourceListResponse: + """List data sources for a knowledge graph with pagination.""" + try: + data_sources, total = await service.list_for_knowledge_graph( + user_id=current_user.user_id.value, + kg_id=kg_id, + offset=offset, + limit=limit, + ) + return DataSourceListResponse( + items=[DataSourceResponse.from_domain(ds) for ds in data_sources], + total=total, + offset=offset, + limit=limit, + ) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) + + +@router.get( + "/data-sources/{ds_id}", + response_model=DataSourceResponse, + summary="Get a data source by ID", +) +async def get_data_source( + ds_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[DataSourceService, Depends(get_data_source_service)], +) -> DataSourceResponse: + """Get a data source by ID.""" + try: + ds = await service.get(user_id=current_user.user_id.value, ds_id=ds_id) + if ds is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Data source not found", + ) + return DataSourceResponse.from_domain(ds) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) + + +@router.patch( + "/data-sources/{ds_id}", + response_model=DataSourceResponse, + summary="Update a data source", +) +async def update_data_source( + ds_id: str, + request: UpdateDataSourceRequest, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[DataSourceService, Depends(get_data_source_service)], +) -> DataSourceResponse: + """Update a data source's configuration.""" + try: + ds = await service.update( + user_id=current_user.user_id.value, + ds_id=ds_id, + name=request.name, + connection_config=request.connection_config, + raw_credentials=request.credentials, + ) + return DataSourceResponse.from_domain(ds) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except DuplicateDataSourceNameError: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="A data source with this name already exists in this knowledge graph", + ) + except ValueError as e: + error_msg = str(e) + if "not found" in error_msg: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Data source not found", + ) + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=error_msg, + ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) + + +@router.delete( + "/data-sources/{ds_id}", + status_code=status.HTTP_204_NO_CONTENT, + response_model=None, + summary="Delete a data source", +) +async def delete_data_source( + ds_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[DataSourceService, Depends(get_data_source_service)], +) -> None: + """Delete a data source.""" + try: + result = await service.delete( + user_id=current_user.user_id.value, + ds_id=ds_id, + ) + if not result: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Data source not found", + ) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) + + +@router.post( + "/data-sources/{ds_id}/sync", + response_model=SyncRunResponse, + status_code=status.HTTP_202_ACCEPTED, + summary="Trigger a data source sync", +) +async def trigger_sync( + ds_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[DataSourceService, Depends(get_data_source_service)], +) -> SyncRunResponse: + """Trigger a sync for a data source.""" + try: + sync_run = await service.trigger_sync( + user_id=current_user.user_id.value, + ds_id=ds_id, + ) + return SyncRunResponse.from_domain(sync_run) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except ValueError: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Data source not found", + ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) diff --git a/src/api/management/presentation/knowledge_graphs/__init__.py b/src/api/management/presentation/knowledge_graphs/__init__.py new file mode 100644 index 00000000..2f20c9b1 --- /dev/null +++ b/src/api/management/presentation/knowledge_graphs/__init__.py @@ -0,0 +1 @@ +"""Knowledge graph presentation sub-package.""" diff --git a/src/api/management/presentation/knowledge_graphs/models.py b/src/api/management/presentation/knowledge_graphs/models.py new file mode 100644 index 00000000..0a898362 --- /dev/null +++ b/src/api/management/presentation/knowledge_graphs/models.py @@ -0,0 +1,91 @@ +"""Request and response models for Knowledge Graph API endpoints.""" + +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, Field + +from management.domain.aggregates import KnowledgeGraph + + +class CreateKnowledgeGraphRequest(BaseModel): + """Request to create a knowledge graph. + + Attributes: + name: Knowledge graph name (1-100 characters) + description: Optional description (defaults to empty string) + """ + + name: str = Field(min_length=1, max_length=100) + description: str = Field(default="") + + +class UpdateKnowledgeGraphRequest(BaseModel): + """Request to partially update a knowledge graph. + + Attributes: + name: Optional new name (1-100 characters) + description: Optional new description + """ + + name: str | None = Field(default=None, min_length=1, max_length=100) + description: str | None = Field(default=None) + + +class KnowledgeGraphResponse(BaseModel): + """Response containing knowledge graph details. + + Attributes: + id: Knowledge graph ID (ULID) + tenant_id: Tenant ID this knowledge graph belongs to + workspace_id: Workspace ID containing this knowledge graph + name: Knowledge graph name + description: Knowledge graph description + created_at: Creation timestamp + updated_at: Last update timestamp + """ + + id: str + tenant_id: str + workspace_id: str + name: str + description: str + created_at: datetime + updated_at: datetime + + @classmethod + def from_domain(cls, kg: KnowledgeGraph) -> KnowledgeGraphResponse: + """Convert domain KnowledgeGraph aggregate to API response. + + Args: + kg: KnowledgeGraph domain aggregate + + Returns: + KnowledgeGraphResponse with knowledge graph details + """ + return cls( + id=kg.id.value, + tenant_id=kg.tenant_id, + workspace_id=kg.workspace_id, + name=kg.name, + description=kg.description, + created_at=kg.created_at, + updated_at=kg.updated_at, + ) + + +class KnowledgeGraphListResponse(BaseModel): + """Response containing a paginated list of knowledge graphs. + + Attributes: + items: List of knowledge graph details + total: Total number of knowledge graphs (before pagination) + offset: Number of items skipped + limit: Maximum number of items returned + """ + + items: list[KnowledgeGraphResponse] + total: int + offset: int + limit: int diff --git a/src/api/management/presentation/knowledge_graphs/routes.py b/src/api/management/presentation/knowledge_graphs/routes.py new file mode 100644 index 00000000..635a0bb8 --- /dev/null +++ b/src/api/management/presentation/knowledge_graphs/routes.py @@ -0,0 +1,231 @@ +"""Knowledge graph management routes.""" + +from __future__ import annotations + +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Query, status + +from iam.application.value_objects import CurrentUser +from iam.dependencies.user import get_current_user +from management.application.services.knowledge_graph_service import ( + KnowledgeGraphService, +) +from management.dependencies.knowledge_graph import get_knowledge_graph_service +from management.domain.exceptions import InvalidKnowledgeGraphNameError +from management.ports.exceptions import ( + DuplicateKnowledgeGraphNameError, + UnauthorizedError, +) +from management.presentation.knowledge_graphs.models import ( + CreateKnowledgeGraphRequest, + KnowledgeGraphListResponse, + KnowledgeGraphResponse, + UpdateKnowledgeGraphRequest, +) + +router = APIRouter(tags=["Knowledge Graphs"]) + + +@router.post( + "/workspaces/{workspace_id}/knowledge-graphs", + response_model=KnowledgeGraphResponse, + status_code=status.HTTP_201_CREATED, + summary="Create a knowledge graph", +) +async def create_knowledge_graph( + workspace_id: str, + request: CreateKnowledgeGraphRequest, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[KnowledgeGraphService, Depends(get_knowledge_graph_service)], +) -> KnowledgeGraphResponse: + """Create a new knowledge graph in a workspace.""" + try: + kg = await service.create( + user_id=current_user.user_id.value, + workspace_id=workspace_id, + name=request.name, + description=request.description, + ) + return KnowledgeGraphResponse.from_domain(kg) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except DuplicateKnowledgeGraphNameError: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="A knowledge graph with this name already exists", + ) + except (InvalidKnowledgeGraphNameError, ValueError) as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e), + ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) + + +@router.get( + "/workspaces/{workspace_id}/knowledge-graphs", + response_model=KnowledgeGraphListResponse, + summary="List knowledge graphs in a workspace", +) +async def list_knowledge_graphs( + workspace_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[KnowledgeGraphService, Depends(get_knowledge_graph_service)], + offset: int = Query(default=0, ge=0), + limit: int = Query(default=20, ge=1, le=100), +) -> KnowledgeGraphListResponse: + """List knowledge graphs in a workspace with pagination.""" + try: + kgs, total = await service.list_for_workspace( + user_id=current_user.user_id.value, + workspace_id=workspace_id, + offset=offset, + limit=limit, + ) + return KnowledgeGraphListResponse( + items=[KnowledgeGraphResponse.from_domain(kg) for kg in kgs], + total=total, + offset=offset, + limit=limit, + ) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) + + +@router.get( + "/knowledge-graphs/{kg_id}", + response_model=KnowledgeGraphResponse, + summary="Get a knowledge graph by ID", +) +async def get_knowledge_graph( + kg_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[KnowledgeGraphService, Depends(get_knowledge_graph_service)], +) -> KnowledgeGraphResponse: + """Get a knowledge graph by ID.""" + try: + kg = await service.get(user_id=current_user.user_id.value, kg_id=kg_id) + if kg is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Knowledge graph not found", + ) + return KnowledgeGraphResponse.from_domain(kg) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) + + +@router.patch( + "/knowledge-graphs/{kg_id}", + response_model=KnowledgeGraphResponse, + summary="Update a knowledge graph", +) +async def update_knowledge_graph( + kg_id: str, + request: UpdateKnowledgeGraphRequest, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[KnowledgeGraphService, Depends(get_knowledge_graph_service)], +) -> KnowledgeGraphResponse: + """Update a knowledge graph's metadata.""" + try: + kg = await service.update( + user_id=current_user.user_id.value, + kg_id=kg_id, + name=request.name, + description=request.description, + ) + return KnowledgeGraphResponse.from_domain(kg) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except DuplicateKnowledgeGraphNameError: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="A knowledge graph with this name already exists", + ) + except (InvalidKnowledgeGraphNameError, ValueError) as e: + error_msg = str(e) + if "not found" in error_msg: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Knowledge graph not found", + ) + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=error_msg, + ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) + + +@router.delete( + "/knowledge-graphs/{kg_id}", + status_code=status.HTTP_204_NO_CONTENT, + response_model=None, + summary="Delete a knowledge graph", +) +async def delete_knowledge_graph( + kg_id: str, + current_user: Annotated[CurrentUser, Depends(get_current_user)], + service: Annotated[KnowledgeGraphService, Depends(get_knowledge_graph_service)], +) -> None: + """Delete a knowledge graph.""" + try: + result = await service.delete( + user_id=current_user.user_id.value, + kg_id=kg_id, + ) + if not result: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Knowledge graph not found", + ) + except UnauthorizedError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to perform this action", + ) + except HTTPException: + raise + except Exception: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred", + ) diff --git a/src/api/tests/integration/management/conftest.py b/src/api/tests/integration/management/conftest.py index 8167f93b..73ddf1a4 100644 --- a/src/api/tests/integration/management/conftest.py +++ b/src/api/tests/integration/management/conftest.py @@ -1,7 +1,6 @@ """Integration test fixtures for Management bounded context. -These fixtures require a running PostgreSQL instance. -Database settings follow the same pattern as IAM integration tests. +These fixtures require running PostgreSQL, SpiceDB, and Keycloak instances. """ from __future__ import annotations @@ -11,31 +10,33 @@ import pytest import pytest_asyncio +from asgi_lifespan import LifespanManager +from cryptography.fernet import Fernet +from httpx import ASGITransport, AsyncClient +from jose import jwt as jose_jwt from pydantic import SecretStr from sqlalchemy import text -from sqlalchemy.exc import ProgrammingError from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker -from ulid import ULID +from infrastructure.authorization_dependencies import get_spicedb_client from infrastructure.database.engines import create_write_engine -from infrastructure.outbox.repository import OutboxRepository from infrastructure.settings import DatabaseSettings -from management.infrastructure.repositories.data_source_repository import ( - DataSourceRepository, -) -from management.infrastructure.repositories.data_source_sync_run_repository import ( - DataSourceSyncRunRepository, -) -from management.infrastructure.repositories.knowledge_graph_repository import ( - KnowledgeGraphRepository, +from shared_kernel.authorization.protocols import AuthorizationProvider +from shared_kernel.authorization.types import ( + Permission, + ResourceType, + format_resource, + format_subject, ) +from tests.integration.iam.conftest import wait_for_permission -pytestmark = pytest.mark.integration +# Ensure encryption key is available for management services +os.environ.setdefault("KARTOGRAPH_MGMT_ENCRYPTION_KEY", Fernet.generate_key().decode()) @pytest.fixture(scope="session") -def management_db_settings() -> DatabaseSettings: - """Database settings for Management integration tests.""" +def mgmt_db_settings() -> DatabaseSettings: + """Database settings for management integration tests.""" return DatabaseSettings( host=os.getenv("KARTOGRAPH_DB_HOST", "localhost"), port=int(os.getenv("KARTOGRAPH_DB_PORT", "5432")), @@ -49,11 +50,11 @@ def management_db_settings() -> DatabaseSettings: @pytest_asyncio.fixture -async def async_session( - management_db_settings: DatabaseSettings, +async def mgmt_async_session( + mgmt_db_settings: DatabaseSettings, ) -> AsyncGenerator[AsyncSession, None]: - """Provide an async session for integration tests.""" - engine = create_write_engine(management_db_settings) + """Provide an async session for management integration tests.""" + engine = create_write_engine(mgmt_db_settings) sessionmaker = async_sessionmaker(engine, expire_on_commit=False) async with sessionmaker() as session: @@ -63,152 +64,124 @@ async def async_session( @pytest_asyncio.fixture -async def session_factory( - management_db_settings: DatabaseSettings, -) -> AsyncGenerator[async_sessionmaker[AsyncSession], None]: - """Provide a session factory for tests that need to create multiple sessions. +async def async_client(): + """Create async HTTP client for testing with lifespan support.""" + from main import app - This is needed for components that create their own sessions. - """ - engine = create_write_engine(management_db_settings) - factory = async_sessionmaker(engine, expire_on_commit=False) + async with LifespanManager(app): + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as client: + yield client - yield factory - await engine.dispose() +@pytest.fixture +def spicedb_client() -> AuthorizationProvider: + """Provide a SpiceDB client for integration tests.""" + return get_spicedb_client() + + +@pytest.fixture +def alice_user_id(alice_token: str) -> str: + """Extract the actual user_id (sub claim) from alice's JWT token.""" + claims = jose_jwt.get_unverified_claims(alice_token) + return claims["sub"] @pytest_asyncio.fixture async def clean_management_data( - async_session: AsyncSession, + mgmt_async_session: AsyncSession, ) -> AsyncGenerator[None, None]: - """Clean Management tables before and after tests. + """Clean management tables before and after tests. Deletion order respects FK constraints: - outbox (management events) -> data_source_sync_runs -> - data_sources -> knowledge_graphs - - Guards against tables not existing yet (TDD-first: tests may be - collected before migration runs). + data_source_sync_runs -> data_sources -> knowledge_graphs """ async def cleanup() -> None: - """Perform cleanup with proper FK constraint ordering.""" try: - # Clean encrypted credentials first - await async_session.execute(text("DELETE FROM encrypted_credentials")) - # Clean management-related outbox entries - await async_session.execute( - text( - "DELETE FROM outbox WHERE aggregate_type " - "IN ('knowledge_graph', 'data_source')" + async with mgmt_async_session.begin(): + await mgmt_async_session.execute( + text("DELETE FROM data_source_sync_runs") ) - ) - await async_session.execute(text("DELETE FROM data_source_sync_runs")) - await async_session.execute(text("DELETE FROM data_sources")) - await async_session.execute(text("DELETE FROM knowledge_graphs")) - await async_session.commit() - except ProgrammingError: - # Tables may not exist yet if migration hasn't run - await async_session.rollback() - - # Clean before test - await cleanup() + await mgmt_async_session.execute(text("DELETE FROM data_sources")) + await mgmt_async_session.execute(text("DELETE FROM knowledge_graphs")) + # Clean outbox entries related to management + await mgmt_async_session.execute( + text( + "DELETE FROM outbox WHERE aggregate_type IN " + "('knowledge_graph', 'data_source')" + ) + ) + except Exception: + await mgmt_async_session.rollback() + raise + await cleanup() yield - - # Clean after test await cleanup() -@pytest.fixture -def knowledge_graph_repository( - async_session: AsyncSession, -) -> KnowledgeGraphRepository: - """Provide a KnowledgeGraphRepository for integration tests.""" - outbox = OutboxRepository(session=async_session) - return KnowledgeGraphRepository(session=async_session, outbox=outbox) +async def grant_kg_permission( + spicedb_client: AuthorizationProvider, + user_id: str, + kg_id: str, + workspace_id: str, +) -> None: + """Set up SpiceDB relationships for a KG. - -@pytest.fixture -def data_source_repository( - async_session: AsyncSession, -) -> DataSourceRepository: - """Provide a DataSourceRepository for integration tests.""" - outbox = OutboxRepository(session=async_session) - return DataSourceRepository(session=async_session, outbox=outbox) - - -@pytest.fixture -def data_source_sync_run_repository( - async_session: AsyncSession, -) -> DataSourceSyncRunRepository: - """Provide a DataSourceSyncRunRepository for integration tests.""" - return DataSourceSyncRunRepository(session=async_session) - - -@pytest_asyncio.fixture -async def test_tenant( - async_session: AsyncSession, - clean_management_data: None, -) -> AsyncGenerator[str, None]: - """Create a tenant in the tenants table for FK satisfaction. - - Uses raw SQL to insert directly, avoiding dependency on IAM domain objects. - Cleans up the test tenant on teardown. - Returns the tenant_id string. + Writes the workspace relationship on the KG and grants + admin permission on the KG to the user. """ - tenant_id = str(ULID()) - await async_session.execute( - text( - "INSERT INTO tenants (id, name, created_at, updated_at) " - "VALUES (:id, :name, NOW(), NOW())" - ), - {"id": tenant_id, "name": f"test-tenant-{tenant_id}"}, + kg_resource = format_resource(ResourceType.KNOWLEDGE_GRAPH, kg_id) + user_subject = format_subject(ResourceType.USER, user_id) + ws_subject = format_resource(ResourceType.WORKSPACE, workspace_id) + + # Write workspace parent relationship + await spicedb_client.write_relationship( + resource=kg_resource, + relation="workspace", + subject=ws_subject, ) - await async_session.commit() - yield tenant_id + # Wait for view permission (inherited from workspace) + view_ready = await wait_for_permission( + spicedb_client, + resource=kg_resource, + permission=Permission.VIEW, + subject=user_subject, + timeout=5.0, + ) + assert view_ready, "Timed out waiting for KG view permission" - # Teardown: remove test tenant (workspaces cleaned first by test_workspace) - try: - await async_session.execute( - text("DELETE FROM workspaces WHERE tenant_id = :tid"), - {"tid": tenant_id}, - ) - await async_session.execute( - text("DELETE FROM tenants WHERE id = :tid"), - {"tid": tenant_id}, - ) - await async_session.commit() - except ProgrammingError: - await async_session.rollback() +async def grant_ds_permission( + spicedb_client: AuthorizationProvider, + user_id: str, + ds_id: str, + kg_id: str, +) -> None: + """Set up SpiceDB relationships for a DataSource. -@pytest_asyncio.fixture -async def test_workspace( - async_session: AsyncSession, - test_tenant: str, -) -> AsyncGenerator[str, None]: - """Create a workspace in the workspaces table for FK satisfaction. - - Depends on test_tenant to ensure a valid tenant_id FK reference. - Uses raw SQL to insert directly, avoiding dependency on IAM domain objects. - Returns the workspace_id string. + Writes the knowledge_graph relationship on the DS so permissions + inherit from the KG. """ - workspace_id = str(ULID()) - await async_session.execute( - text( - "INSERT INTO workspaces (id, tenant_id, name, is_root, created_at, updated_at) " - "VALUES (:id, :tenant_id, :name, :is_root, NOW(), NOW())" - ), - { - "id": workspace_id, - "tenant_id": test_tenant, - "name": f"test-workspace-{workspace_id}", - "is_root": True, - }, + ds_resource = format_resource(ResourceType.DATA_SOURCE, ds_id) + user_subject = format_subject(ResourceType.USER, user_id) + kg_subject = format_resource(ResourceType.KNOWLEDGE_GRAPH, kg_id) + + # Write knowledge_graph parent relationship + await spicedb_client.write_relationship( + resource=ds_resource, + relation="knowledge_graph", + subject=kg_subject, ) - await async_session.commit() - yield workspace_id + # Wait for view permission (inherited from KG) + view_ready = await wait_for_permission( + spicedb_client, + resource=ds_resource, + permission=Permission.VIEW, + subject=user_subject, + timeout=5.0, + ) + assert view_ready, "Timed out waiting for DS view permission" diff --git a/src/api/tests/integration/management/test_data_source_api.py b/src/api/tests/integration/management/test_data_source_api.py new file mode 100644 index 00000000..dacc269f --- /dev/null +++ b/src/api/tests/integration/management/test_data_source_api.py @@ -0,0 +1,418 @@ +"""Integration tests for Data Source API endpoints. + +Tests the full stack: HTTP request -> route -> DI -> service -> +SpiceDB + PostgreSQL -> response. +""" + +from __future__ import annotations + +import pytest +from httpx import AsyncClient + +from shared_kernel.authorization.protocols import AuthorizationProvider +from tests.integration.management.conftest import ( + grant_ds_permission, + grant_kg_permission, +) + +pytestmark = [pytest.mark.integration, pytest.mark.keycloak] + + +async def _get_root_workspace_id( + async_client: AsyncClient, tenant_auth_headers: dict +) -> str: + """Get the root workspace ID for the default tenant.""" + resp = await async_client.get("/iam/workspaces", headers=tenant_auth_headers) + assert resp.status_code == 200 + workspaces = resp.json()["workspaces"] + root = next((w for w in workspaces if w["is_root"]), None) + assert root is not None, "Root workspace should exist" + return root["id"] + + +async def _create_kg( + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + workspace_id: str, + name: str = "Test KG for DS", +) -> str: + """Create a KG and set up SpiceDB permissions. Returns the KG ID.""" + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": name}, + ) + assert resp.status_code == 201, f"Failed to create KG: {resp.text}" + kg_id = resp.json()["id"] + + await grant_kg_permission(spicedb_client, alice_user_id, kg_id, workspace_id) + return kg_id + + +class TestDataSourceCreation: + """Tests for POST /management/knowledge-graphs/{kg_id}/data-sources.""" + + @pytest.mark.asyncio + async def test_creates_data_source( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test creating a DS via API returns 201 with correct fields.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + ) + + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": "Test DS", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + }, + ) + + assert resp.status_code == 201, f"Unexpected: {resp.status_code} {resp.text}" + data = resp.json() + assert data["name"] == "Test DS" + assert data["adapter_type"] == "github" + assert data["knowledge_graph_id"] == kg_id + assert data["has_credentials"] is False + assert data["schedule_type"] == "manual" + assert "id" in data + + @pytest.mark.asyncio + async def test_creates_data_source_with_credentials( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test creating a DS with credentials sets has_credentials=true.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + name="KG with creds", + ) + + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": "DS with creds", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + "credentials": {"token": "ghp_secrettoken123"}, + }, + ) + + assert resp.status_code == 201 + data = resp.json() + assert data["has_credentials"] is True + # Credentials should NOT be in response + assert "credentials" not in data + assert "credentials_path" not in data + + +class TestDataSourceTriggerSync: + """Tests for POST /management/data-sources/{ds_id}/sync.""" + + @pytest.mark.asyncio + async def test_trigger_sync_returns_202( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test triggering a sync returns 202 with sync run details.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + name="KG for sync", + ) + + # Create DS + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": "Sync DS", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + }, + ) + assert resp.status_code == 201 + ds_id = resp.json()["id"] + + # Set up SpiceDB permissions for the DS + await grant_ds_permission(spicedb_client, alice_user_id, ds_id, kg_id) + + # Trigger sync + resp = await async_client.post( + f"/management/data-sources/{ds_id}/sync", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 202 + data = resp.json() + assert data["data_source_id"] == ds_id + assert data["status"] == "pending" + assert "id" in data + assert "started_at" in data + + +class TestGetDataSource: + """Tests for GET /management/data-sources/{ds_id}.""" + + @pytest.mark.asyncio + async def test_get_data_source_by_id( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test getting a DS by ID returns 200 with correct fields.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + name="KG for get DS", + ) + + # Create DS + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": "Get DS Test", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + }, + ) + assert resp.status_code == 201 + ds_id = resp.json()["id"] + + # Set up SpiceDB permissions for the DS + await grant_ds_permission(spicedb_client, alice_user_id, ds_id, kg_id) + + # Get DS by ID + resp = await async_client.get( + f"/management/data-sources/{ds_id}", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["id"] == ds_id + assert data["name"] == "Get DS Test" + assert data["adapter_type"] == "github" + assert data["knowledge_graph_id"] == kg_id + assert data["has_credentials"] is False + assert data["schedule_type"] == "manual" + + +class TestListDataSources: + """Tests for GET /management/knowledge-graphs/{kg_id}/data-sources.""" + + @pytest.mark.asyncio + async def test_list_data_sources_for_kg( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test listing DSes for a KG returns 200 with pagination.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + name="KG for list DS", + ) + + # Create 2 DSes + for i in range(2): + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": f"List DS {i}", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": f"repo-{i}"}, + }, + ) + assert resp.status_code == 201 + + # List DSes for KG + resp = await async_client.get( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 2 + assert len(data["items"]) == 2 + assert "offset" in data + assert "limit" in data + + +class TestUpdateDataSource: + """Tests for PATCH /management/data-sources/{ds_id}.""" + + @pytest.mark.asyncio + async def test_update_data_source( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test updating a DS returns 200 with updated name.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + name="KG for update DS", + ) + + # Create DS + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": "Original DS", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + }, + ) + assert resp.status_code == 201 + ds_id = resp.json()["id"] + + # Set up SpiceDB permissions for the DS + await grant_ds_permission(spicedb_client, alice_user_id, ds_id, kg_id) + + # Update DS + resp = await async_client.patch( + f"/management/data-sources/{ds_id}", + headers=tenant_auth_headers, + json={"name": "Updated Name"}, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["name"] == "Updated Name" + + +class TestDeleteDataSource: + """Tests for DELETE /management/data-sources/{ds_id}.""" + + @pytest.mark.asyncio + async def test_delete_data_source( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test deleting a DS returns 204 and subsequent GET returns 404.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + kg_id = await _create_kg( + async_client, + tenant_auth_headers, + spicedb_client, + alice_user_id, + workspace_id, + name="KG for delete DS", + ) + + # Create DS + resp = await async_client.post( + f"/management/knowledge-graphs/{kg_id}/data-sources", + headers=tenant_auth_headers, + json={ + "name": "Delete DS", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + }, + ) + assert resp.status_code == 201 + ds_id = resp.json()["id"] + + # Set up SpiceDB permissions for the DS + await grant_ds_permission(spicedb_client, alice_user_id, ds_id, kg_id) + + # Delete DS + resp = await async_client.delete( + f"/management/data-sources/{ds_id}", + headers=tenant_auth_headers, + ) + assert resp.status_code == 204 + + # Verify DS is gone + resp = await async_client.get( + f"/management/data-sources/{ds_id}", + headers=tenant_auth_headers, + ) + assert resp.status_code == 404 + + +class TestDataSourceAuthorization: + """Tests for authorization enforcement on DS endpoints.""" + + @pytest.mark.asyncio + async def test_unauthorized_user_gets_403( + self, + async_client: AsyncClient, + bob_tenant_auth_headers: dict, + clean_management_data, + ): + """Test that a user without KG edit permission gets 403.""" + resp = await async_client.post( + "/management/knowledge-graphs/01JFAKEKG0000000000000000000/data-sources", + headers=bob_tenant_auth_headers, + json={ + "name": "Unauthorized DS", + "adapter_type": "github", + "connection_config": {}, + }, + ) + + assert resp.status_code == 403 diff --git a/src/api/tests/integration/management/test_data_source_repository.py b/src/api/tests/integration/management/test_data_source_repository.py index 9f395c69..a9c22c41 100644 --- a/src/api/tests/integration/management/test_data_source_repository.py +++ b/src/api/tests/integration/management/test_data_source_repository.py @@ -300,8 +300,11 @@ async def test_finds_data_sources_by_knowledge_graph( await data_source_repository.save(ds2) await data_source_repository.save(ds3) - results = await data_source_repository.find_by_knowledge_graph(kg1.id.value) + results, total = await data_source_repository.find_by_knowledge_graph( + kg1.id.value + ) + assert total == 2 assert len(results) == 2 result_ids = {r.id.value for r in results} assert ds1.id.value in result_ids @@ -314,9 +317,12 @@ async def test_returns_empty_for_kg_with_no_sources( clean_management_data, ): """Should return an empty list when KG has no data sources.""" - results = await data_source_repository.find_by_knowledge_graph("nonexistent") + results, total = await data_source_repository.find_by_knowledge_graph( + "nonexistent" + ) assert results == [] + assert total == 0 class TestDataSourceUniqueness: @@ -390,8 +396,8 @@ async def test_save_records_outbox_event( await knowledge_graph_repository.save(kg) # Clear outbox of the KG create event - await async_session.execute(text("DELETE FROM outbox")) - await async_session.commit() + async with async_session.begin(): + await async_session.execute(text("DELETE FROM outbox")) ds = DataSource.create( knowledge_graph_id=kg.id.value, @@ -404,13 +410,14 @@ async def test_save_records_outbox_event( async with async_session.begin(): await data_source_repository.save(ds) - result = await async_session.execute( - text( - "SELECT aggregate_type, event_type, aggregate_id " - "FROM outbox WHERE aggregate_type = 'data_source'" + async with async_session.begin(): + result = await async_session.execute( + text( + "SELECT aggregate_type, event_type, aggregate_id " + "FROM outbox WHERE aggregate_type = 'data_source'" + ) ) - ) - rows = result.fetchall() + rows = result.fetchall() assert len(rows) == 1 assert rows[0].aggregate_type == "data_source" @@ -449,21 +456,22 @@ async def test_delete_records_outbox_event( await data_source_repository.save(ds) # Clear outbox of the create events so we isolate the delete event - await async_session.execute(text("DELETE FROM outbox")) - await async_session.commit() + async with async_session.begin(): + await async_session.execute(text("DELETE FROM outbox")) ds.mark_for_deletion() async with async_session.begin(): await data_source_repository.delete(ds) - result = await async_session.execute( - text( - "SELECT aggregate_type, event_type, aggregate_id " - "FROM outbox WHERE aggregate_type = 'data_source'" + async with async_session.begin(): + result = await async_session.execute( + text( + "SELECT aggregate_type, event_type, aggregate_id " + "FROM outbox WHERE aggregate_type = 'data_source'" + ) ) - ) - rows = result.fetchall() + rows = result.fetchall() assert len(rows) == 1 assert rows[0].aggregate_type == "data_source" diff --git a/src/api/tests/integration/management/test_knowledge_graph_api.py b/src/api/tests/integration/management/test_knowledge_graph_api.py new file mode 100644 index 00000000..4ca9ca8f --- /dev/null +++ b/src/api/tests/integration/management/test_knowledge_graph_api.py @@ -0,0 +1,286 @@ +"""Integration tests for Knowledge Graph API endpoints. + +Tests the full stack: HTTP request -> route -> DI -> service -> +SpiceDB + PostgreSQL -> response. +""" + +from __future__ import annotations + +import pytest +from httpx import AsyncClient + +from shared_kernel.authorization.protocols import AuthorizationProvider +from tests.integration.management.conftest import grant_kg_permission + +pytestmark = [pytest.mark.integration, pytest.mark.keycloak] + + +async def _get_root_workspace_id( + async_client: AsyncClient, tenant_auth_headers: dict +) -> str: + """Get the root workspace ID for the default tenant.""" + resp = await async_client.get("/iam/workspaces", headers=tenant_auth_headers) + assert resp.status_code == 200 + workspaces = resp.json()["workspaces"] + root = next((w for w in workspaces if w["is_root"]), None) + assert root is not None, "Root workspace should exist" + return root["id"] + + +class TestKnowledgeGraphCreation: + """Tests for POST /management/workspaces/{workspace_id}/knowledge-graphs.""" + + @pytest.mark.asyncio + async def test_creates_knowledge_graph( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + clean_management_data, + ): + """Test creating a KG via API returns 201 with correct fields.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": "Test KG", "description": "Integration test KG"}, + ) + + assert resp.status_code == 201, f"Unexpected: {resp.status_code} {resp.text}" + data = resp.json() + assert data["name"] == "Test KG" + assert data["description"] == "Integration test KG" + assert data["workspace_id"] == workspace_id + assert "id" in data + assert "tenant_id" in data + assert "created_at" in data + assert "updated_at" in data + + @pytest.mark.asyncio + async def test_duplicate_name_returns_409( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + clean_management_data, + ): + """Test that creating a KG with duplicate name returns 409.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + + # Create first KG + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": "Unique KG"}, + ) + assert resp.status_code == 201 + + # Attempt duplicate + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": "Unique KG"}, + ) + assert resp.status_code == 409 + + +class TestKnowledgeGraphList: + """Tests for GET /management/workspaces/{workspace_id}/knowledge-graphs.""" + + @pytest.mark.asyncio + async def test_lists_knowledge_graphs_with_pagination( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test listing KGs with pagination.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + + # Create 3 KGs + kg_ids = [] + for i in range(3): + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": f"KG {i}"}, + ) + assert resp.status_code == 201 + kg_id = resp.json()["id"] + kg_ids.append(kg_id) + + # Set up SpiceDB relationships for listing + await grant_kg_permission( + spicedb_client, alice_user_id, kg_id, workspace_id + ) + + # List with pagination + resp = await async_client.get( + f"/management/workspaces/{workspace_id}/knowledge-graphs?offset=0&limit=2", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 3 + assert len(data["items"]) == 2 + assert data["offset"] == 0 + assert data["limit"] == 2 + + +class TestKnowledgeGraphGet: + """Tests for GET /management/knowledge-graphs/{kg_id}.""" + + @pytest.mark.asyncio + async def test_gets_knowledge_graph( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test getting a KG by ID returns 200.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + + # Create KG + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": "Get Test KG"}, + ) + assert resp.status_code == 201 + kg_id = resp.json()["id"] + + # Grant SpiceDB permissions + await grant_kg_permission(spicedb_client, alice_user_id, kg_id, workspace_id) + + # Get KG + resp = await async_client.get( + f"/management/knowledge-graphs/{kg_id}", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["id"] == kg_id + assert data["name"] == "Get Test KG" + + @pytest.mark.asyncio + async def test_get_nonexistent_returns_404( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + clean_management_data, + ): + """Test getting a nonexistent KG returns 404.""" + resp = await async_client.get( + "/management/knowledge-graphs/01JNONEXISTENT000000000000", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 404 + + +class TestKnowledgeGraphUpdate: + """Tests for PATCH /management/knowledge-graphs/{kg_id}.""" + + @pytest.mark.asyncio + async def test_updates_knowledge_graph( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test updating a KG returns 200 with updated data.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + + # Create KG + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": "Original Name"}, + ) + assert resp.status_code == 201 + kg_id = resp.json()["id"] + + # Grant SpiceDB permissions + await grant_kg_permission(spicedb_client, alice_user_id, kg_id, workspace_id) + + # Update KG + resp = await async_client.patch( + f"/management/knowledge-graphs/{kg_id}", + headers=tenant_auth_headers, + json={"name": "Updated Name"}, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["name"] == "Updated Name" + + +class TestKnowledgeGraphDelete: + """Tests for DELETE /management/knowledge-graphs/{kg_id}.""" + + @pytest.mark.asyncio + async def test_deletes_knowledge_graph( + self, + async_client: AsyncClient, + tenant_auth_headers: dict, + spicedb_client: AuthorizationProvider, + alice_user_id: str, + clean_management_data, + ): + """Test deleting a KG returns 204.""" + workspace_id = await _get_root_workspace_id(async_client, tenant_auth_headers) + + # Create KG + resp = await async_client.post( + f"/management/workspaces/{workspace_id}/knowledge-graphs", + headers=tenant_auth_headers, + json={"name": "Delete Test KG"}, + ) + assert resp.status_code == 201 + kg_id = resp.json()["id"] + + # Grant SpiceDB permissions (need manage for delete) + await grant_kg_permission(spicedb_client, alice_user_id, kg_id, workspace_id) + + # Delete KG + resp = await async_client.delete( + f"/management/knowledge-graphs/{kg_id}", + headers=tenant_auth_headers, + ) + + assert resp.status_code == 204 + + # Verify it's gone + resp = await async_client.get( + f"/management/knowledge-graphs/{kg_id}", + headers=tenant_auth_headers, + ) + assert resp.status_code == 404 + + +class TestKnowledgeGraphAuthorization: + """Tests for authorization enforcement on KG endpoints.""" + + @pytest.mark.asyncio + async def test_unauthorized_user_gets_403( + self, + async_client: AsyncClient, + bob_tenant_auth_headers: dict, + clean_management_data, + ): + """Test that a user without workspace edit permission gets 403.""" + # Bob has tenant membership but no workspace admin/edit + resp = await async_client.post( + "/management/workspaces/01JFAKEWORKSPACE00000000000/knowledge-graphs", + headers=bob_tenant_auth_headers, + json={"name": "Unauthorized KG"}, + ) + + assert resp.status_code == 403 diff --git a/src/api/tests/integration/management/test_knowledge_graph_repository.py b/src/api/tests/integration/management/test_knowledge_graph_repository.py index 0d81b10b..e44b88bb 100644 --- a/src/api/tests/integration/management/test_knowledge_graph_repository.py +++ b/src/api/tests/integration/management/test_knowledge_graph_repository.py @@ -130,21 +130,22 @@ async def test_update_records_outbox_event( await knowledge_graph_repository.save(kg) # Clear outbox of the create event so we can isolate the update event - await async_session.execute(text("DELETE FROM outbox")) - await async_session.commit() + async with async_session.begin(): + await async_session.execute(text("DELETE FROM outbox")) kg.update(name="After Update", description="After") async with async_session.begin(): await knowledge_graph_repository.save(kg) - result = await async_session.execute( - text( - "SELECT aggregate_type, event_type, aggregate_id " - "FROM outbox WHERE aggregate_type = 'knowledge_graph'" + async with async_session.begin(): + result = await async_session.execute( + text( + "SELECT aggregate_type, event_type, aggregate_id " + "FROM outbox WHERE aggregate_type = 'knowledge_graph'" + ) ) - ) - rows = result.fetchall() + rows = result.fetchall() assert len(rows) == 1 assert rows[0].aggregate_type == "knowledge_graph" @@ -245,23 +246,23 @@ async def test_delete_kg_with_data_sources_raises_integrity_error( from ulid import ULID ds_id = str(ULID()) - await async_session.execute( - text( - "INSERT INTO data_sources " - "(id, knowledge_graph_id, tenant_id, name, adapter_type, " - "connection_config, schedule_type, created_at, updated_at) " - "VALUES (:id, :kg_id, :tid, :name, :adapter, " - "'{}'::jsonb, 'MANUAL', NOW(), NOW())" - ), - { - "id": ds_id, - "kg_id": kg.id.value, - "tid": test_tenant, - "name": "child-ds", - "adapter": DataSourceAdapterType.GITHUB.value, - }, - ) - await async_session.commit() + async with async_session.begin(): + await async_session.execute( + text( + "INSERT INTO data_sources " + "(id, knowledge_graph_id, tenant_id, name, adapter_type, " + "connection_config, schedule_type, created_at, updated_at) " + "VALUES (:id, :kg_id, :tid, :name, :adapter, " + "'{}'::jsonb, 'MANUAL', NOW(), NOW())" + ), + { + "id": ds_id, + "kg_id": kg.id.value, + "tid": test_tenant, + "name": "child-ds", + "adapter": DataSourceAdapterType.GITHUB.value, + }, + ) kg.mark_for_deletion() @@ -341,26 +342,26 @@ async def test_finds_knowledge_graphs_by_tenant( other_tenant_id = str(ULID()) other_workspace_id = str(ULID()) - await async_session.execute( - text( - "INSERT INTO tenants (id, name, created_at, updated_at) " - "VALUES (:id, :name, NOW(), NOW())" - ), - {"id": other_tenant_id, "name": f"other-tenant-{other_tenant_id}"}, - ) - await async_session.execute( - text( - "INSERT INTO workspaces (id, tenant_id, name, is_root, created_at, updated_at) " - "VALUES (:id, :tenant_id, :name, :is_root, NOW(), NOW())" - ), - { - "id": other_workspace_id, - "tenant_id": other_tenant_id, - "name": f"other-workspace-{other_workspace_id}", - "is_root": True, - }, - ) - await async_session.commit() + async with async_session.begin(): + await async_session.execute( + text( + "INSERT INTO tenants (id, name, created_at, updated_at) " + "VALUES (:id, :name, NOW(), NOW())" + ), + {"id": other_tenant_id, "name": f"other-tenant-{other_tenant_id}"}, + ) + await async_session.execute( + text( + "INSERT INTO workspaces (id, tenant_id, name, is_root, created_at, updated_at) " + "VALUES (:id, :tenant_id, :name, :is_root, NOW(), NOW())" + ), + { + "id": other_workspace_id, + "tenant_id": other_tenant_id, + "name": f"other-workspace-{other_workspace_id}", + "is_root": True, + }, + ) # Create 1 KG in the other tenant kg_other = KnowledgeGraph.create( @@ -374,8 +375,9 @@ async def test_finds_knowledge_graphs_by_tenant( await knowledge_graph_repository.save(kg_other) # Query for the test tenant - results = await knowledge_graph_repository.find_by_tenant(test_tenant) + results, total = await knowledge_graph_repository.find_by_tenant(test_tenant) + assert total == 2 assert len(results) == 2 result_ids = {r.id.value for r in results} assert kg1.id.value in result_ids @@ -388,9 +390,12 @@ async def test_returns_empty_list_for_tenant_with_no_graphs( clean_management_data, ): """Should return an empty list when tenant has no knowledge graphs.""" - results = await knowledge_graph_repository.find_by_tenant("nonexistent-tenant") + results, total = await knowledge_graph_repository.find_by_tenant( + "nonexistent-tenant" + ) assert results == [] + assert total == 0 class TestOutboxConsistency: @@ -407,8 +412,8 @@ async def test_save_records_outbox_event( ): """Should record a KnowledgeGraphCreated event in the outbox table.""" # Clear any pre-existing outbox entries - await async_session.execute(text("DELETE FROM outbox")) - await async_session.commit() + async with async_session.begin(): + await async_session.execute(text("DELETE FROM outbox")) kg = KnowledgeGraph.create( tenant_id=test_tenant, @@ -420,13 +425,14 @@ async def test_save_records_outbox_event( async with async_session.begin(): await knowledge_graph_repository.save(kg) - result = await async_session.execute( - text( - "SELECT aggregate_type, event_type, aggregate_id " - "FROM outbox WHERE aggregate_type = 'knowledge_graph'" + async with async_session.begin(): + result = await async_session.execute( + text( + "SELECT aggregate_type, event_type, aggregate_id " + "FROM outbox WHERE aggregate_type = 'knowledge_graph'" + ) ) - ) - rows = result.fetchall() + rows = result.fetchall() assert len(rows) == 1 assert rows[0].aggregate_type == "knowledge_graph" @@ -454,8 +460,8 @@ async def test_delete_records_outbox_event( await knowledge_graph_repository.save(kg) # Clear outbox of the create event so we can isolate the delete event - await async_session.execute(text("DELETE FROM outbox")) - await async_session.commit() + async with async_session.begin(): + await async_session.execute(text("DELETE FROM outbox")) async with async_session.begin(): retrieved = await knowledge_graph_repository.get_by_id(kg.id) @@ -464,13 +470,14 @@ async def test_delete_records_outbox_event( retrieved.mark_for_deletion() await knowledge_graph_repository.delete(retrieved) - result = await async_session.execute( - text( - "SELECT aggregate_type, event_type, aggregate_id " - "FROM outbox WHERE aggregate_type = 'knowledge_graph'" + async with async_session.begin(): + result = await async_session.execute( + text( + "SELECT aggregate_type, event_type, aggregate_id " + "FROM outbox WHERE aggregate_type = 'knowledge_graph'" + ) ) - ) - rows = result.fetchall() + rows = result.fetchall() assert len(rows) == 1 assert rows[0].aggregate_type == "knowledge_graph" diff --git a/src/api/tests/unit/management/application/test_data_source_service.py b/src/api/tests/unit/management/application/test_data_source_service.py index e1d044f3..2676f15f 100644 --- a/src/api/tests/unit/management/application/test_data_source_service.py +++ b/src/api/tests/unit/management/application/test_data_source_service.py @@ -375,7 +375,7 @@ async def test_list_checks_view_permission_on_kg( """list_for_knowledge_graph() checks VIEW on the KG.""" mock_authz.check_permission.return_value = True mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) - mock_ds_repo.find_by_knowledge_graph.return_value = [] + mock_ds_repo.find_by_knowledge_graph.return_value = ([], 0) await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) @@ -436,11 +436,14 @@ async def test_list_returns_data_sources( mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) ds1 = _make_ds(ds_id="ds-001") ds2 = _make_ds(ds_id="ds-002") - mock_ds_repo.find_by_knowledge_graph.return_value = [ds1, ds2] + mock_ds_repo.find_by_knowledge_graph.return_value = ([ds1, ds2], 2) - result = await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + result, total = await service.list_for_knowledge_graph( + user_id=user_id, kg_id=kg_id + ) assert len(result) == 2 + assert total == 2 mock_probe.data_sources_listed.assert_called_once_with( kg_id=kg_id, count=2, diff --git a/src/api/tests/unit/management/application/test_knowledge_graph_service.py b/src/api/tests/unit/management/application/test_knowledge_graph_service.py index 44e0923f..8bd5f610 100644 --- a/src/api/tests/unit/management/application/test_knowledge_graph_service.py +++ b/src/api/tests/unit/management/application/test_knowledge_graph_service.py @@ -362,11 +362,12 @@ async def test_list_uses_read_relationships_to_discover_kgs( ] mock_kg_repo.get_by_id.side_effect = [kg1, kg2] - result = await service.list_for_workspace( + result, total = await service.list_for_workspace( user_id=user_id, workspace_id=workspace_id ) assert len(result) == 2 + assert total == 2 mock_probe.knowledge_graphs_listed.assert_called_once_with( workspace_id=workspace_id, count=2, @@ -394,11 +395,12 @@ async def test_list_filters_by_tenant( ] mock_kg_repo.get_by_id.side_effect = [kg_own, kg_other] - result = await service.list_for_workspace( + result, total = await service.list_for_workspace( user_id=user_id, workspace_id=workspace_id ) assert len(result) == 1 + assert total == 1 assert result[0].id.value == "kg-001" @@ -537,7 +539,7 @@ async def test_delete_checks_manage_permission_on_kg( kg = _make_kg() mock_authz.check_permission.return_value = True mock_kg_repo.get_by_id.return_value = kg - mock_ds_repo.find_by_knowledge_graph.return_value = [] + mock_ds_repo.find_by_knowledge_graph.return_value = ([], 0) mock_kg_repo.delete.return_value = True await service.delete(user_id=user_id, kg_id=kg.id.value) @@ -593,7 +595,7 @@ async def test_delete_cascades_data_sources( ds2 = MagicMock() mock_authz.check_permission.return_value = True mock_kg_repo.get_by_id.return_value = kg - mock_ds_repo.find_by_knowledge_graph.return_value = [ds1, ds2] + mock_ds_repo.find_by_knowledge_graph.return_value = ([ds1, ds2], 2) mock_ds_repo.delete.return_value = True mock_kg_repo.delete.return_value = True @@ -614,7 +616,7 @@ async def test_delete_probes_success( kg = _make_kg() mock_authz.check_permission.return_value = True mock_kg_repo.get_by_id.return_value = kg - mock_ds_repo.find_by_knowledge_graph.return_value = [] + mock_ds_repo.find_by_knowledge_graph.return_value = ([], 0) mock_kg_repo.delete.return_value = True await service.delete(user_id=user_id, kg_id=kg.id.value) diff --git a/src/api/tests/unit/management/presentation/__init__.py b/src/api/tests/unit/management/presentation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/api/tests/unit/management/presentation/test_data_source_routes.py b/src/api/tests/unit/management/presentation/test_data_source_routes.py new file mode 100644 index 00000000..583f5258 --- /dev/null +++ b/src/api/tests/unit/management/presentation/test_data_source_routes.py @@ -0,0 +1,435 @@ +"""Unit tests for Data Source route handlers. + +Tests route-level behavior including status codes, response shapes, +error handling, and pagination. Service dependencies are mocked. +""" + +from __future__ import annotations + +from datetime import UTC, datetime +from unittest.mock import AsyncMock + +import pytest +import pytest_asyncio +from httpx import ASGITransport, AsyncClient + +from iam.application.value_objects import CurrentUser +from iam.domain.value_objects import TenantId, UserId +from management.domain.aggregates import DataSource +from management.domain.entities import DataSourceSyncRun +from management.domain.value_objects import DataSourceId, Schedule, ScheduleType +from management.ports.exceptions import ( + DuplicateDataSourceNameError, + UnauthorizedError, +) +from shared_kernel.datasource_types import DataSourceAdapterType + +# Fixed test data +TENANT_ID = "test-tenant-id" +KG_ID = "01JTEST00000000000000KG001" +DS_ID = "01JTEST00000000000000DS001" +USER_ID = "test-user-id" +NOW = datetime(2025, 1, 1, tzinfo=UTC) + + +def _make_current_user() -> CurrentUser: + """Create a CurrentUser for dependency override.""" + return CurrentUser( + user_id=UserId(value=USER_ID), + username="testuser", + tenant_id=TenantId(value=TENANT_ID), + ) + + +def _make_ds( + ds_id: str = DS_ID, + name: str = "Test DS", + credentials_path: str | None = None, +) -> DataSource: + """Create a DataSource aggregate for testing.""" + return DataSource( + id=DataSourceId(value=ds_id), + knowledge_graph_id=KG_ID, + tenant_id=TENANT_ID, + name=name, + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={"owner": "test", "repo": "test-repo"}, + credentials_path=credentials_path, + schedule=Schedule(schedule_type=ScheduleType.MANUAL), + last_sync_at=None, + created_at=NOW, + updated_at=NOW, + ) + + +def _make_sync_run(ds_id: str = DS_ID) -> DataSourceSyncRun: + """Create a DataSourceSyncRun entity for testing.""" + return DataSourceSyncRun( + id="01JTEST0000000000000SYNC01", + data_source_id=ds_id, + status="pending", + started_at=NOW, + completed_at=None, + error=None, + created_at=NOW, + ) + + +@pytest_asyncio.fixture +async def mock_service(): + """Create a mock DataSourceService.""" + return AsyncMock() + + +@pytest_asyncio.fixture +async def client(mock_service): + """Create an async HTTP client with mocked dependencies.""" + from main import app + + from iam.dependencies.user import get_current_user + from management.dependencies.data_source import get_data_source_service + + app.dependency_overrides[get_current_user] = lambda: _make_current_user() + app.dependency_overrides[get_data_source_service] = lambda: mock_service + + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as c: + yield c + + app.dependency_overrides.clear() + + +class TestCreateDataSource: + """Tests for POST /management/knowledge-graphs/{kg_id}/data-sources.""" + + @pytest.mark.asyncio + async def test_creates_successfully(self, client, mock_service): + """Test successful creation returns 201 with correct response shape.""" + ds = _make_ds() + mock_service.create.return_value = ds + + resp = await client.post( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + json={ + "name": "Test DS", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + }, + ) + + assert resp.status_code == 201 + data = resp.json() + assert data["id"] == DS_ID + assert data["knowledge_graph_id"] == KG_ID + assert data["tenant_id"] == TENANT_ID + assert data["name"] == "Test DS" + assert data["adapter_type"] == "github" + assert data["connection_config"] == {"owner": "test", "repo": "test-repo"} + assert data["has_credentials"] is False + assert data["schedule_type"] == "manual" + assert data["schedule_value"] is None + assert data["last_sync_at"] is None + + @pytest.mark.asyncio + async def test_creates_with_credentials(self, client, mock_service): + """Test creation with credentials sets has_credentials=true.""" + ds = _make_ds(credentials_path="datasource/ds1/credentials") + mock_service.create.return_value = ds + + resp = await client.post( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + json={ + "name": "Test DS", + "adapter_type": "github", + "connection_config": {"owner": "test", "repo": "test-repo"}, + "credentials": {"token": "secret"}, + }, + ) + + assert resp.status_code == 201 + data = resp.json() + assert data["has_credentials"] is True + # Credentials should NOT be in response + assert "credentials" not in data + assert "credentials_path" not in data + + mock_service.create.assert_awaited_once_with( + user_id=USER_ID, + kg_id=KG_ID, + name="Test DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={"owner": "test", "repo": "test-repo"}, + raw_credentials={"token": "secret"}, + ) + + @pytest.mark.asyncio + async def test_invalid_adapter_type_returns_400(self, client, mock_service): + """Test that invalid adapter type returns 400.""" + resp = await client.post( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + json={ + "name": "Test DS", + "adapter_type": "invalid_type", + "connection_config": {}, + }, + ) + + assert resp.status_code == 400 + assert "adapter type" in resp.json()["detail"].lower() + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.create.side_effect = UnauthorizedError("denied") + + resp = await client.post( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + json={ + "name": "Test DS", + "adapter_type": "github", + "connection_config": {}, + }, + ) + + assert resp.status_code == 403 + + @pytest.mark.asyncio + async def test_duplicate_name_returns_409(self, client, mock_service): + """Test that DuplicateDataSourceNameError maps to 409.""" + mock_service.create.side_effect = DuplicateDataSourceNameError("dup") + + resp = await client.post( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + json={ + "name": "Test DS", + "adapter_type": "github", + "connection_config": {}, + }, + ) + + assert resp.status_code == 409 + + @pytest.mark.asyncio + async def test_empty_name_returns_422(self, client, mock_service): + """Test that Pydantic validation rejects empty name.""" + resp = await client.post( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + json={ + "name": "", + "adapter_type": "github", + "connection_config": {}, + }, + ) + + assert resp.status_code == 422 + + +class TestListDataSources: + """Tests for GET /management/knowledge-graphs/{kg_id}/data-sources.""" + + @pytest.mark.asyncio + async def test_lists_successfully(self, client, mock_service): + """Test successful list returns 200 with correct pagination.""" + data_sources = [ + _make_ds(ds_id=f"01JTEST00000000000000DS00{i}") for i in range(3) + ] + mock_service.list_for_knowledge_graph.return_value = (data_sources, 3) + + resp = await client.get( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 3 + assert len(data["items"]) == 3 + assert data["offset"] == 0 + assert data["limit"] == 20 + + @pytest.mark.asyncio + async def test_pagination_offset_limit(self, client, mock_service): + """Test that offset and limit query params work correctly.""" + # Service returns only the paginated slice; total reflects full count + data_sources = [ + _make_ds(ds_id=f"01JTEST00000000000000DS00{i}") for i in range(2) + ] + mock_service.list_for_knowledge_graph.return_value = (data_sources, 5) + + resp = await client.get( + f"/management/knowledge-graphs/{KG_ID}/data-sources?offset=2&limit=2", + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 5 + assert len(data["items"]) == 2 + assert data["offset"] == 2 + assert data["limit"] == 2 + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.list_for_knowledge_graph.side_effect = UnauthorizedError("denied") + + resp = await client.get( + f"/management/knowledge-graphs/{KG_ID}/data-sources", + ) + + assert resp.status_code == 403 + + +class TestGetDataSource: + """Tests for GET /management/data-sources/{ds_id}.""" + + @pytest.mark.asyncio + async def test_gets_successfully(self, client, mock_service): + """Test successful get returns 200 with correct response.""" + ds = _make_ds() + mock_service.get.return_value = ds + + resp = await client.get(f"/management/data-sources/{DS_ID}") + + assert resp.status_code == 200 + data = resp.json() + assert data["id"] == DS_ID + assert data["name"] == "Test DS" + + @pytest.mark.asyncio + async def test_not_found_returns_404(self, client, mock_service): + """Test that None from service maps to 404.""" + mock_service.get.return_value = None + + resp = await client.get(f"/management/data-sources/{DS_ID}") + + assert resp.status_code == 404 + + +class TestUpdateDataSource: + """Tests for PATCH /management/data-sources/{ds_id}.""" + + @pytest.mark.asyncio + async def test_updates_successfully(self, client, mock_service): + """Test successful update returns 200.""" + ds = _make_ds(name="Updated DS") + mock_service.update.return_value = ds + + resp = await client.patch( + f"/management/data-sources/{DS_ID}", + json={"name": "Updated DS"}, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["name"] == "Updated DS" + + mock_service.update.assert_awaited_once_with( + user_id=USER_ID, + ds_id=DS_ID, + name="Updated DS", + connection_config=None, + raw_credentials=None, + ) + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.update.side_effect = UnauthorizedError("denied") + + resp = await client.patch( + f"/management/data-sources/{DS_ID}", + json={"name": "New Name"}, + ) + + assert resp.status_code == 403 + + @pytest.mark.asyncio + async def test_duplicate_name_returns_409(self, client, mock_service): + """Test that DuplicateDataSourceNameError maps to 409.""" + mock_service.update.side_effect = DuplicateDataSourceNameError("dup") + + resp = await client.patch( + f"/management/data-sources/{DS_ID}", + json={"name": "Existing Name"}, + ) + + assert resp.status_code == 409 + + @pytest.mark.asyncio + async def test_not_found_returns_404(self, client, mock_service): + """Test that ValueError with 'not found' from service maps to 404.""" + mock_service.update.side_effect = ValueError("not found") + + resp = await client.patch( + f"/management/data-sources/{DS_ID}", + json={"name": "New Name"}, + ) + + assert resp.status_code == 404 + + +class TestDeleteDataSource: + """Tests for DELETE /management/data-sources/{ds_id}.""" + + @pytest.mark.asyncio + async def test_deletes_successfully(self, client, mock_service): + """Test successful delete returns 204.""" + mock_service.delete.return_value = True + + resp = await client.delete(f"/management/data-sources/{DS_ID}") + + assert resp.status_code == 204 + + @pytest.mark.asyncio + async def test_not_found_returns_404(self, client, mock_service): + """Test that False from service maps to 404.""" + mock_service.delete.return_value = False + + resp = await client.delete(f"/management/data-sources/{DS_ID}") + + assert resp.status_code == 404 + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.delete.side_effect = UnauthorizedError("denied") + + resp = await client.delete(f"/management/data-sources/{DS_ID}") + + assert resp.status_code == 403 + + +class TestTriggerSync: + """Tests for POST /management/data-sources/{ds_id}/sync.""" + + @pytest.mark.asyncio + async def test_triggers_successfully(self, client, mock_service): + """Test successful sync trigger returns 202.""" + sync_run = _make_sync_run() + mock_service.trigger_sync.return_value = sync_run + + resp = await client.post(f"/management/data-sources/{DS_ID}/sync") + + assert resp.status_code == 202 + data = resp.json() + assert data["id"] == "01JTEST0000000000000SYNC01" + assert data["data_source_id"] == DS_ID + assert data["status"] == "pending" + assert data["completed_at"] is None + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.trigger_sync.side_effect = UnauthorizedError("denied") + + resp = await client.post(f"/management/data-sources/{DS_ID}/sync") + + assert resp.status_code == 403 + + @pytest.mark.asyncio + async def test_not_found_returns_404(self, client, mock_service): + """Test that ValueError maps to 404.""" + mock_service.trigger_sync.side_effect = ValueError("not found") + + resp = await client.post(f"/management/data-sources/{DS_ID}/sync") + + assert resp.status_code == 404 diff --git a/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py b/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py new file mode 100644 index 00000000..726e522d --- /dev/null +++ b/src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py @@ -0,0 +1,377 @@ +"""Unit tests for Knowledge Graph route handlers. + +Tests route-level behavior including status codes, response shapes, +error handling, and pagination. Service dependencies are mocked. +""" + +from __future__ import annotations + +from datetime import UTC, datetime +from unittest.mock import AsyncMock + +import pytest +import pytest_asyncio +from httpx import ASGITransport, AsyncClient + +from iam.application.value_objects import CurrentUser +from iam.domain.value_objects import TenantId, UserId +from management.domain.aggregates import KnowledgeGraph +from management.domain.value_objects import KnowledgeGraphId +from management.ports.exceptions import ( + DuplicateKnowledgeGraphNameError, + UnauthorizedError, +) + +# Fixed test data +TENANT_ID = "test-tenant-id" +WORKSPACE_ID = "test-workspace-id" +USER_ID = "test-user-id" +KG_ID = "01JTEST00000000000000KG001" +NOW = datetime(2025, 1, 1, tzinfo=UTC) + + +def _make_current_user() -> CurrentUser: + """Create a CurrentUser for dependency override.""" + return CurrentUser( + user_id=UserId(value=USER_ID), + username="testuser", + tenant_id=TenantId(value=TENANT_ID), + ) + + +def _make_kg( + kg_id: str = KG_ID, + name: str = "Test KG", + description: str = "A test knowledge graph", +) -> KnowledgeGraph: + """Create a KnowledgeGraph aggregate for testing.""" + return KnowledgeGraph( + id=KnowledgeGraphId(value=kg_id), + tenant_id=TENANT_ID, + workspace_id=WORKSPACE_ID, + name=name, + description=description, + created_at=NOW, + updated_at=NOW, + ) + + +@pytest_asyncio.fixture +async def mock_service(): + """Create a mock KnowledgeGraphService.""" + return AsyncMock() + + +@pytest_asyncio.fixture +async def client(mock_service): + """Create an async HTTP client with mocked dependencies.""" + from main import app + + from iam.dependencies.user import get_current_user + from management.dependencies.knowledge_graph import get_knowledge_graph_service + + app.dependency_overrides[get_current_user] = lambda: _make_current_user() + app.dependency_overrides[get_knowledge_graph_service] = lambda: mock_service + + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as c: + yield c + + app.dependency_overrides.clear() + + +class TestCreateKnowledgeGraph: + """Tests for POST /management/workspaces/{workspace_id}/knowledge-graphs.""" + + @pytest.mark.asyncio + async def test_creates_successfully(self, client, mock_service): + """Test successful creation returns 201 with correct response shape.""" + kg = _make_kg() + mock_service.create.return_value = kg + + resp = await client.post( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + json={"name": "Test KG", "description": "A test knowledge graph"}, + ) + + assert resp.status_code == 201 + data = resp.json() + assert data["id"] == KG_ID + assert data["tenant_id"] == TENANT_ID + assert data["workspace_id"] == WORKSPACE_ID + assert data["name"] == "Test KG" + assert data["description"] == "A test knowledge graph" + assert "created_at" in data + assert "updated_at" in data + + mock_service.create.assert_awaited_once_with( + user_id=USER_ID, + workspace_id=WORKSPACE_ID, + name="Test KG", + description="A test knowledge graph", + ) + + @pytest.mark.asyncio + async def test_default_description(self, client, mock_service): + """Test that description defaults to empty string.""" + kg = _make_kg(description="") + mock_service.create.return_value = kg + + resp = await client.post( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + json={"name": "Test KG"}, + ) + + assert resp.status_code == 201 + mock_service.create.assert_awaited_once_with( + user_id=USER_ID, + workspace_id=WORKSPACE_ID, + name="Test KG", + description="", + ) + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.create.side_effect = UnauthorizedError("denied") + + resp = await client.post( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + json={"name": "Test KG"}, + ) + + assert resp.status_code == 403 + + @pytest.mark.asyncio + async def test_duplicate_name_returns_409(self, client, mock_service): + """Test that DuplicateKnowledgeGraphNameError maps to 409.""" + mock_service.create.side_effect = DuplicateKnowledgeGraphNameError("dup") + + resp = await client.post( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + json={"name": "Test KG"}, + ) + + assert resp.status_code == 409 + + @pytest.mark.asyncio + async def test_empty_name_returns_422(self, client, mock_service): + """Test that Pydantic validation rejects empty name.""" + resp = await client.post( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + json={"name": ""}, + ) + + assert resp.status_code == 422 + + @pytest.mark.asyncio + async def test_name_too_long_returns_422(self, client, mock_service): + """Test that Pydantic validation rejects names over 100 characters.""" + resp = await client.post( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + json={"name": "x" * 101}, + ) + + assert resp.status_code == 422 + + +class TestListKnowledgeGraphs: + """Tests for GET /management/workspaces/{workspace_id}/knowledge-graphs.""" + + @pytest.mark.asyncio + async def test_lists_successfully(self, client, mock_service): + """Test successful list returns 200 with correct pagination.""" + kgs = [_make_kg(kg_id=f"01JTEST00000000000000KG00{i}") for i in range(3)] + mock_service.list_for_workspace.return_value = (kgs, 3) + + resp = await client.get( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 3 + assert len(data["items"]) == 3 + assert data["offset"] == 0 + assert data["limit"] == 20 + + @pytest.mark.asyncio + async def test_pagination_offset_limit(self, client, mock_service): + """Test that offset and limit query params work correctly.""" + # Service returns only the paginated slice; total reflects full count + kgs = [_make_kg(kg_id=f"01JTEST00000000000000KG00{i}") for i in range(2)] + mock_service.list_for_workspace.return_value = (kgs, 5) + + resp = await client.get( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs?offset=1&limit=2", + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 5 + assert len(data["items"]) == 2 + assert data["offset"] == 1 + assert data["limit"] == 2 + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.list_for_workspace.side_effect = UnauthorizedError("denied") + + resp = await client.get( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + ) + + assert resp.status_code == 403 + + @pytest.mark.asyncio + async def test_empty_list(self, client, mock_service): + """Test listing returns empty result correctly.""" + mock_service.list_for_workspace.return_value = ([], 0) + + resp = await client.get( + f"/management/workspaces/{WORKSPACE_ID}/knowledge-graphs", + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 0 + assert data["items"] == [] + + +class TestGetKnowledgeGraph: + """Tests for GET /management/knowledge-graphs/{kg_id}.""" + + @pytest.mark.asyncio + async def test_gets_successfully(self, client, mock_service): + """Test successful get returns 200 with correct response.""" + kg = _make_kg() + mock_service.get.return_value = kg + + resp = await client.get(f"/management/knowledge-graphs/{KG_ID}") + + assert resp.status_code == 200 + data = resp.json() + assert data["id"] == KG_ID + assert data["name"] == "Test KG" + + @pytest.mark.asyncio + async def test_not_found_returns_404(self, client, mock_service): + """Test that None from service maps to 404.""" + mock_service.get.return_value = None + + resp = await client.get(f"/management/knowledge-graphs/{KG_ID}") + + assert resp.status_code == 404 + + +class TestUpdateKnowledgeGraph: + """Tests for PATCH /management/knowledge-graphs/{kg_id}.""" + + @pytest.mark.asyncio + async def test_updates_successfully(self, client, mock_service): + """Test successful update returns 200 with updated data.""" + kg = _make_kg(name="Updated KG") + mock_service.update.return_value = kg + + resp = await client.patch( + f"/management/knowledge-graphs/{KG_ID}", + json={"name": "Updated KG"}, + ) + + assert resp.status_code == 200 + data = resp.json() + assert data["name"] == "Updated KG" + + mock_service.update.assert_awaited_once_with( + user_id=USER_ID, + kg_id=KG_ID, + name="Updated KG", + description=None, + ) + + @pytest.mark.asyncio + async def test_partial_update_description_only(self, client, mock_service): + """Test updating only description passes None for name.""" + kg = _make_kg(description="New desc") + mock_service.update.return_value = kg + + resp = await client.patch( + f"/management/knowledge-graphs/{KG_ID}", + json={"description": "New desc"}, + ) + + assert resp.status_code == 200 + mock_service.update.assert_awaited_once_with( + user_id=USER_ID, + kg_id=KG_ID, + name=None, + description="New desc", + ) + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.update.side_effect = UnauthorizedError("denied") + + resp = await client.patch( + f"/management/knowledge-graphs/{KG_ID}", + json={"name": "New Name"}, + ) + + assert resp.status_code == 403 + + @pytest.mark.asyncio + async def test_duplicate_name_returns_409(self, client, mock_service): + """Test that DuplicateKnowledgeGraphNameError maps to 409.""" + mock_service.update.side_effect = DuplicateKnowledgeGraphNameError("dup") + + resp = await client.patch( + f"/management/knowledge-graphs/{KG_ID}", + json={"name": "Existing Name"}, + ) + + assert resp.status_code == 409 + + @pytest.mark.asyncio + async def test_not_found_returns_404(self, client, mock_service): + """Test that ValueError with 'not found' from service maps to 404.""" + mock_service.update.side_effect = ValueError("not found") + + resp = await client.patch( + f"/management/knowledge-graphs/{KG_ID}", + json={"name": "New Name"}, + ) + + assert resp.status_code == 404 + + +class TestDeleteKnowledgeGraph: + """Tests for DELETE /management/knowledge-graphs/{kg_id}.""" + + @pytest.mark.asyncio + async def test_deletes_successfully(self, client, mock_service): + """Test successful delete returns 204.""" + mock_service.delete.return_value = True + + resp = await client.delete(f"/management/knowledge-graphs/{KG_ID}") + + assert resp.status_code == 204 + + @pytest.mark.asyncio + async def test_not_found_returns_404(self, client, mock_service): + """Test that False from service maps to 404.""" + mock_service.delete.return_value = False + + resp = await client.delete(f"/management/knowledge-graphs/{KG_ID}") + + assert resp.status_code == 404 + + @pytest.mark.asyncio + async def test_unauthorized_returns_403(self, client, mock_service): + """Test that UnauthorizedError maps to 403.""" + mock_service.delete.side_effect = UnauthorizedError("denied") + + resp = await client.delete(f"/management/knowledge-graphs/{KG_ID}") + + assert resp.status_code == 403 diff --git a/src/api/tests/unit/management/test_architecture.py b/src/api/tests/unit/management/test_architecture.py index dd6addae..56f1e372 100644 --- a/src/api/tests/unit/management/test_architecture.py +++ b/src/api/tests/unit/management/test_architecture.py @@ -227,14 +227,16 @@ def test_management_does_not_import_iam(self): IAM manages authentication and authorization. Management should not couple to IAM's user, tenant, or API key domain objects. - The management.dependencies package is excluded because DI wiring - is a presentation-layer concern that legitimately crosses context - boundaries (e.g., extracting CurrentUser from IAM for tenant scoping). + The management.dependencies and management.presentation packages + are excluded because DI wiring and route handlers are + presentation-layer concerns that legitimately cross context + boundaries (e.g., extracting CurrentUser from IAM for tenant + scoping and auth dependency injection). """ ( archrule("management_no_iam") .match("management*") - .exclude("management.dependencies*") + .exclude("management.dependencies*", "management.presentation*") .should_not_import("iam*") .check("management") ) diff --git a/src/api/uv.lock b/src/api/uv.lock index b085bb5d..a6fec3b0 100644 --- a/src/api/uv.lock +++ b/src/api/uv.lock @@ -1171,7 +1171,7 @@ wheels = [ [[package]] name = "kartograph-api" -version = "3.30.0" +version = "3.31.0" source = { virtual = "." } dependencies = [ { name = "alembic" },