diff --git a/src/api/management/application/__init__.py b/src/api/management/application/__init__.py new file mode 100644 index 00000000..17c235ea --- /dev/null +++ b/src/api/management/application/__init__.py @@ -0,0 +1,5 @@ +"""Management application layer. + +Contains application services that orchestrate domain operations +with authorization, transaction management, and observability. +""" diff --git a/src/api/management/application/observability/__init__.py b/src/api/management/application/observability/__init__.py new file mode 100644 index 00000000..514398f9 --- /dev/null +++ b/src/api/management/application/observability/__init__.py @@ -0,0 +1,20 @@ +"""Domain-Oriented Observability for Management application layer. + +Probes for application service operations following Domain-Oriented Observability patterns. +""" + +from management.application.observability.data_source_service_probe import ( + DataSourceServiceProbe, + DefaultDataSourceServiceProbe, +) +from management.application.observability.knowledge_graph_service_probe import ( + DefaultKnowledgeGraphServiceProbe, + KnowledgeGraphServiceProbe, +) + +__all__ = [ + "DataSourceServiceProbe", + "DefaultDataSourceServiceProbe", + "KnowledgeGraphServiceProbe", + "DefaultKnowledgeGraphServiceProbe", +] diff --git a/src/api/management/application/observability/data_source_service_probe.py b/src/api/management/application/observability/data_source_service_probe.py new file mode 100644 index 00000000..86d779f2 --- /dev/null +++ b/src/api/management/application/observability/data_source_service_probe.py @@ -0,0 +1,265 @@ +"""Protocol for data source application service observability. + +Defines the interface for domain probes that capture application-level +domain events for data source service operations. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Protocol + +import structlog + +if TYPE_CHECKING: + from shared_kernel.observability_context import ObservationContext + + +class DataSourceServiceProbe(Protocol): + """Domain probe for data source application service operations. + + Records domain-significant events related to data source operations. + """ + + def data_source_created( + self, + ds_id: str, + kg_id: str, + tenant_id: str, + name: str, + ) -> None: + """Record data source creation.""" + ... + + def data_source_creation_failed( + self, + kg_id: str, + name: str, + error: str, + ) -> None: + """Record failed data source creation.""" + ... + + def data_source_retrieved( + self, + ds_id: str, + ) -> None: + """Record data source retrieval.""" + ... + + def data_source_updated( + self, + ds_id: str, + name: str, + ) -> None: + """Record data source update.""" + ... + + def data_source_deleted( + self, + ds_id: str, + ) -> None: + """Record data source deletion.""" + ... + + def data_source_deletion_failed( + self, + ds_id: str, + error: str, + ) -> None: + """Record failed data source deletion.""" + ... + + def data_sources_listed( + self, + kg_id: str, + count: int, + ) -> None: + """Record data sources listed.""" + ... + + def sync_requested( + self, + ds_id: str, + ) -> None: + """Record sync requested.""" + ... + + def permission_denied( + self, + user_id: str, + resource_id: str, + permission: str, + ) -> None: + """Record permission denied.""" + ... + + def with_context(self, context: ObservationContext) -> DataSourceServiceProbe: + """Return a new probe with additional context.""" + ... + + +class DefaultDataSourceServiceProbe: + """Default implementation of DataSourceServiceProbe using structlog.""" + + def __init__( + self, + logger: structlog.stdlib.BoundLogger | None = None, + context: ObservationContext | None = None, + ): + self._logger = logger or structlog.get_logger() + self._context = context + + def _get_context_kwargs(self, exclude: set[str] | None = None) -> dict[str, Any]: + """Get context as kwargs dict, excluding specified keys. + + Args: + exclude: Set of keys to exclude from context (avoids parameter collision) + + Returns: + Context dict with excluded keys filtered out + """ + if self._context is None: + return {} + + context_dict = self._context.as_dict() + if exclude: + return {k: v for k, v in context_dict.items() if k not in exclude} + return context_dict + + def with_context( + self, context: ObservationContext + ) -> DefaultDataSourceServiceProbe: + """Create a new probe with observation context bound.""" + return DefaultDataSourceServiceProbe(logger=self._logger, context=context) + + def data_source_created( + self, + ds_id: str, + kg_id: str, + tenant_id: str, + name: str, + ) -> None: + """Record data source creation.""" + context_kwargs = self._get_context_kwargs( + exclude={"ds_id", "kg_id", "tenant_id", "name"} + ) + self._logger.info( + "data_source_created", + ds_id=ds_id, + kg_id=kg_id, + tenant_id=tenant_id, + name=name, + **context_kwargs, + ) + + def data_source_creation_failed( + self, + kg_id: str, + name: str, + error: str, + ) -> None: + """Record failed data source creation.""" + context_kwargs = self._get_context_kwargs(exclude={"kg_id", "name", "error"}) + self._logger.error( + "data_source_creation_failed", + kg_id=kg_id, + name=name, + error=error, + **context_kwargs, + ) + + def data_source_retrieved( + self, + ds_id: str, + ) -> None: + """Record data source retrieval.""" + context_kwargs = self._get_context_kwargs(exclude={"ds_id"}) + self._logger.debug( + "data_source_retrieved", + ds_id=ds_id, + **context_kwargs, + ) + + def data_source_updated( + self, + ds_id: str, + name: str, + ) -> None: + """Record data source update.""" + context_kwargs = self._get_context_kwargs(exclude={"ds_id", "name"}) + self._logger.info( + "data_source_updated", + ds_id=ds_id, + name=name, + **context_kwargs, + ) + + def data_source_deleted( + self, + ds_id: str, + ) -> None: + """Record data source deletion.""" + context_kwargs = self._get_context_kwargs(exclude={"ds_id"}) + self._logger.info( + "data_source_deleted", + ds_id=ds_id, + **context_kwargs, + ) + + def data_source_deletion_failed( + self, + ds_id: str, + error: str, + ) -> None: + """Record failed data source deletion.""" + context_kwargs = self._get_context_kwargs(exclude={"ds_id", "error"}) + self._logger.error( + "data_source_deletion_failed", + ds_id=ds_id, + error=error, + **context_kwargs, + ) + + def data_sources_listed( + self, + kg_id: str, + count: int, + ) -> None: + """Record data sources listed.""" + context_kwargs = self._get_context_kwargs(exclude={"kg_id", "count"}) + self._logger.debug( + "data_sources_listed", + kg_id=kg_id, + count=count, + **context_kwargs, + ) + + def sync_requested( + self, + ds_id: str, + ) -> None: + """Record sync requested.""" + context_kwargs = self._get_context_kwargs(exclude={"ds_id"}) + self._logger.info( + "data_source_sync_requested", + ds_id=ds_id, + **context_kwargs, + ) + + def permission_denied( + self, + user_id: str, + resource_id: str, + permission: str, + ) -> None: + """Record permission denied.""" + context_kwargs = self._get_context_kwargs( + exclude={"user_id", "resource_id", "permission"} + ) + self._logger.warning( + "data_source_permission_denied", + user_id=user_id, + resource_id=resource_id, + permission=permission, + **context_kwargs, + ) diff --git a/src/api/management/application/observability/knowledge_graph_service_probe.py b/src/api/management/application/observability/knowledge_graph_service_probe.py new file mode 100644 index 00000000..3bd7b78d --- /dev/null +++ b/src/api/management/application/observability/knowledge_graph_service_probe.py @@ -0,0 +1,248 @@ +"""Protocol for knowledge graph application service observability. + +Defines the interface for domain probes that capture application-level +domain events for knowledge graph service operations. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Protocol + +import structlog + +if TYPE_CHECKING: + from shared_kernel.observability_context import ObservationContext + + +class KnowledgeGraphServiceProbe(Protocol): + """Domain probe for knowledge graph application service operations. + + Records domain-significant events related to knowledge graph operations. + """ + + def knowledge_graph_created( + self, + kg_id: str, + tenant_id: str, + workspace_id: str, + name: str, + ) -> None: + """Record knowledge graph creation.""" + ... + + def knowledge_graph_creation_failed( + self, + tenant_id: str, + name: str, + error: str, + ) -> None: + """Record failed knowledge graph creation.""" + ... + + def knowledge_graph_retrieved( + self, + kg_id: str, + ) -> None: + """Record knowledge graph retrieval.""" + ... + + def knowledge_graph_updated( + self, + kg_id: str, + name: str, + ) -> None: + """Record knowledge graph update.""" + ... + + def knowledge_graph_deleted( + self, + kg_id: str, + ) -> None: + """Record knowledge graph deletion.""" + ... + + def knowledge_graph_deletion_failed( + self, + kg_id: str, + error: str, + ) -> None: + """Record failed knowledge graph deletion.""" + ... + + def knowledge_graphs_listed( + self, + workspace_id: str, + count: int, + ) -> None: + """Record knowledge graphs listed.""" + ... + + def permission_denied( + self, + user_id: str, + resource_id: str, + permission: str, + ) -> None: + """Record permission denied.""" + ... + + def with_context(self, context: ObservationContext) -> KnowledgeGraphServiceProbe: + """Return a new probe with additional context.""" + ... + + +class DefaultKnowledgeGraphServiceProbe: + """Default implementation of KnowledgeGraphServiceProbe using structlog.""" + + def __init__( + self, + logger: structlog.stdlib.BoundLogger | None = None, + context: ObservationContext | None = None, + ): + self._logger = logger or structlog.get_logger() + self._context = context + + def _get_context_kwargs(self, exclude: set[str] | None = None) -> dict[str, Any]: + """Get context as kwargs dict, excluding specified keys. + + Args: + exclude: Set of keys to exclude from context (avoids parameter collision) + + Returns: + Context dict with excluded keys filtered out + """ + if self._context is None: + return {} + + context_dict = self._context.as_dict() + if exclude: + return {k: v for k, v in context_dict.items() if k not in exclude} + return context_dict + + def with_context( + self, context: ObservationContext + ) -> DefaultKnowledgeGraphServiceProbe: + """Create a new probe with observation context bound.""" + return DefaultKnowledgeGraphServiceProbe(logger=self._logger, context=context) + + def knowledge_graph_created( + self, + kg_id: str, + tenant_id: str, + workspace_id: str, + name: str, + ) -> None: + """Record knowledge graph creation.""" + context_kwargs = self._get_context_kwargs( + exclude={"kg_id", "tenant_id", "workspace_id", "name"} + ) + self._logger.info( + "knowledge_graph_created", + kg_id=kg_id, + tenant_id=tenant_id, + workspace_id=workspace_id, + name=name, + **context_kwargs, + ) + + def knowledge_graph_creation_failed( + self, + tenant_id: str, + name: str, + error: str, + ) -> None: + """Record failed knowledge graph creation.""" + context_kwargs = self._get_context_kwargs( + exclude={"tenant_id", "name", "error"} + ) + self._logger.error( + "knowledge_graph_creation_failed", + tenant_id=tenant_id, + name=name, + error=error, + **context_kwargs, + ) + + def knowledge_graph_retrieved( + self, + kg_id: str, + ) -> None: + """Record knowledge graph retrieval.""" + context_kwargs = self._get_context_kwargs(exclude={"kg_id"}) + self._logger.debug( + "knowledge_graph_retrieved", + kg_id=kg_id, + **context_kwargs, + ) + + def knowledge_graph_updated( + self, + kg_id: str, + name: str, + ) -> None: + """Record knowledge graph update.""" + context_kwargs = self._get_context_kwargs(exclude={"kg_id", "name"}) + self._logger.info( + "knowledge_graph_updated", + kg_id=kg_id, + name=name, + **context_kwargs, + ) + + def knowledge_graph_deleted( + self, + kg_id: str, + ) -> None: + """Record knowledge graph deletion.""" + context_kwargs = self._get_context_kwargs(exclude={"kg_id"}) + self._logger.info( + "knowledge_graph_deleted", + kg_id=kg_id, + **context_kwargs, + ) + + def knowledge_graph_deletion_failed( + self, + kg_id: str, + error: str, + ) -> None: + """Record failed knowledge graph deletion.""" + context_kwargs = self._get_context_kwargs(exclude={"kg_id", "error"}) + self._logger.error( + "knowledge_graph_deletion_failed", + kg_id=kg_id, + error=error, + **context_kwargs, + ) + + def knowledge_graphs_listed( + self, + workspace_id: str, + count: int, + ) -> None: + """Record knowledge graphs listed.""" + context_kwargs = self._get_context_kwargs(exclude={"workspace_id", "count"}) + self._logger.debug( + "knowledge_graphs_listed", + workspace_id=workspace_id, + count=count, + **context_kwargs, + ) + + def permission_denied( + self, + user_id: str, + resource_id: str, + permission: str, + ) -> None: + """Record permission denied.""" + context_kwargs = self._get_context_kwargs( + exclude={"user_id", "resource_id", "permission"} + ) + self._logger.warning( + "knowledge_graph_permission_denied", + user_id=user_id, + resource_id=resource_id, + permission=permission, + **context_kwargs, + ) diff --git a/src/api/management/application/services/__init__.py b/src/api/management/application/services/__init__.py new file mode 100644 index 00000000..73d75fc6 --- /dev/null +++ b/src/api/management/application/services/__init__.py @@ -0,0 +1,15 @@ +"""Management application services. + +Application services orchestrate domain operations with proper +authorization, transaction management, and observability. +""" + +from management.application.services.data_source_service import DataSourceService +from management.application.services.knowledge_graph_service import ( + KnowledgeGraphService, +) + +__all__ = [ + "DataSourceService", + "KnowledgeGraphService", +] diff --git a/src/api/management/application/services/data_source_service.py b/src/api/management/application/services/data_source_service.py new file mode 100644 index 00000000..0533c1fc --- /dev/null +++ b/src/api/management/application/services/data_source_service.py @@ -0,0 +1,460 @@ +"""DataSource application service for Management bounded context. + +Orchestrates data source operations with proper authorization, +credential management, transaction management, and observability. +""" + +from __future__ import annotations + +from datetime import UTC, datetime + +from sqlalchemy.ext.asyncio import AsyncSession +from ulid import ULID + +from management.application.observability import ( + DataSourceServiceProbe, + DefaultDataSourceServiceProbe, +) +from management.domain.aggregates import DataSource +from management.domain.entities import DataSourceSyncRun +from management.domain.value_objects import DataSourceId, KnowledgeGraphId +from management.ports.exceptions import UnauthorizedError +from management.ports.repositories import ( + IDataSourceRepository, + IDataSourceSyncRunRepository, + IKnowledgeGraphRepository, +) +from management.ports.secret_store import ISecretStoreRepository +from shared_kernel.authorization.protocols import AuthorizationProvider +from shared_kernel.authorization.types import ( + Permission, + ResourceType, + format_resource, + format_subject, +) +from shared_kernel.datasource_types import DataSourceAdapterType + + +class DataSourceService: + """Application service for data source management. + + Orchestrates data source operations with proper tenant scoping, + authorization checks, credential management, and observability. + """ + + def __init__( + self, + session: AsyncSession, + data_source_repository: IDataSourceRepository, + knowledge_graph_repository: IKnowledgeGraphRepository, + secret_store: ISecretStoreRepository, + sync_run_repository: IDataSourceSyncRunRepository, + authz: AuthorizationProvider, + scope_to_tenant: str, + probe: DataSourceServiceProbe | None = None, + ) -> None: + """Initialize DataSourceService with dependencies. + + Args: + session: Database session for transaction management + data_source_repository: Repository for DS persistence + knowledge_graph_repository: Repository for KG lookups + secret_store: Secret store for credential management + sync_run_repository: Repository for sync run tracking + authz: Authorization provider for permission checks + scope_to_tenant: Tenant ID string to scope this service to + probe: Optional domain probe for observability + """ + self._session = session + self._ds_repo = data_source_repository + self._kg_repo = knowledge_graph_repository + self._secret_store = secret_store + self._sync_run_repo = sync_run_repository + self._authz = authz + self._scope_to_tenant = scope_to_tenant + self._probe = probe or DefaultDataSourceServiceProbe() + + async def _check_permission( + self, + user_id: str, + resource_type: ResourceType, + resource_id: str, + permission: Permission, + ) -> bool: + """Check if user has permission on a resource. + + Args: + user_id: The user to check + resource_type: Type of resource + resource_id: ID of the resource + permission: The permission to check + + Returns: + True if user has permission, False otherwise + """ + resource = format_resource(resource_type, resource_id) + subject = format_subject(ResourceType.USER, user_id) + return await self._authz.check_permission( + resource=resource, + permission=permission, + subject=subject, + ) + + async def create( + self, + user_id: str, + kg_id: str, + name: str, + adapter_type: DataSourceAdapterType, + connection_config: dict[str, str], + raw_credentials: dict[str, str] | None = None, + ) -> DataSource: + """Create a new data source in a knowledge graph. + + Args: + user_id: The user creating the DS + kg_id: The knowledge graph to create the DS in + name: Name of the data source + adapter_type: Type of adapter (e.g., GITHUB) + connection_config: Connection configuration key-value pairs + raw_credentials: Optional credentials to encrypt and store + + Returns: + The created DataSource aggregate + + Raises: + UnauthorizedError: If user lacks EDIT permission on KG + ValueError: If KG not found or belongs to different tenant + """ + has_edit = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.KNOWLEDGE_GRAPH, + resource_id=kg_id, + permission=Permission.EDIT, + ) + + if not has_edit: + self._probe.permission_denied( + user_id=user_id, + resource_id=kg_id, + permission=Permission.EDIT, + ) + raise UnauthorizedError( + f"User {user_id} lacks edit permission on knowledge graph {kg_id}" + ) + + # Verify KG exists and belongs to tenant + kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) + if kg is None: + raise ValueError(f"Knowledge graph {kg_id} not found") + if kg.tenant_id != self._scope_to_tenant: + raise ValueError(f"Knowledge graph {kg_id} belongs to different tenant") + + async with self._session.begin(): + ds = DataSource.create( + knowledge_graph_id=kg_id, + tenant_id=self._scope_to_tenant, + name=name, + adapter_type=adapter_type, + connection_config=connection_config, + created_by=user_id, + ) + + if raw_credentials is not None: + cred_path = f"datasource/{ds.id.value}/credentials" + await self._secret_store.store( + path=cred_path, + tenant_id=self._scope_to_tenant, + credentials=raw_credentials, + ) + ds.credentials_path = cred_path + + await self._ds_repo.save(ds) + + self._probe.data_source_created( + ds_id=ds.id.value, + kg_id=kg_id, + tenant_id=self._scope_to_tenant, + name=name, + ) + + return ds + + async def get( + self, + user_id: str, + ds_id: str, + ) -> DataSource | None: + """Get a data source by ID with authorization check. + + Args: + user_id: The user requesting access + ds_id: The data source ID + + Returns: + The DataSource aggregate, or None if not found or if + the caller lacks VIEW permission (to avoid existence leakage) + """ + ds = await self._ds_repo.get_by_id(DataSourceId(value=ds_id)) + if ds is None: + return None + + if ds.tenant_id != self._scope_to_tenant: + return None + + has_view = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.DATA_SOURCE, + resource_id=ds_id, + permission=Permission.VIEW, + ) + + if not has_view: + return None + + self._probe.data_source_retrieved(ds_id=ds_id) + return ds + + async def list_for_knowledge_graph( + self, + user_id: str, + kg_id: str, + ) -> list[DataSource]: + """List data sources for a knowledge graph. + + Args: + user_id: The user requesting the list + kg_id: The knowledge graph to list DSes for + + Returns: + List of DataSource aggregates + + Raises: + UnauthorizedError: If user lacks VIEW permission on KG + """ + has_view = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.KNOWLEDGE_GRAPH, + resource_id=kg_id, + permission=Permission.VIEW, + ) + + if not has_view: + self._probe.permission_denied( + user_id=user_id, + resource_id=kg_id, + permission=Permission.VIEW, + ) + raise UnauthorizedError( + f"User {user_id} lacks view permission on knowledge graph {kg_id}" + ) + + # Verify KG belongs to tenant (defense-in-depth) + kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) + if kg is None or kg.tenant_id != self._scope_to_tenant: + raise UnauthorizedError(f"Knowledge graph {kg_id} not accessible") + + data_sources = await self._ds_repo.find_by_knowledge_graph(kg_id) + + self._probe.data_sources_listed( + kg_id=kg_id, + count=len(data_sources), + ) + + return data_sources + + async def update( + self, + user_id: str, + ds_id: str, + name: str | None = None, + connection_config: dict[str, str] | None = None, + raw_credentials: dict[str, str] | None = None, + ) -> DataSource: + """Update a data source's configuration. + + Args: + user_id: The user performing the update + ds_id: The data source ID + name: Optional new name + connection_config: Optional new connection configuration + raw_credentials: Optional new credentials to encrypt and store + + Returns: + The updated DataSource aggregate + + Raises: + UnauthorizedError: If user lacks EDIT permission + ValueError: If DS not found + """ + has_edit = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.DATA_SOURCE, + resource_id=ds_id, + permission=Permission.EDIT, + ) + + if not has_edit: + self._probe.permission_denied( + user_id=user_id, + resource_id=ds_id, + permission=Permission.EDIT, + ) + raise UnauthorizedError( + f"User {user_id} lacks edit permission on data source {ds_id}" + ) + + ds = await self._ds_repo.get_by_id(DataSourceId(value=ds_id)) + if ds is None: + raise ValueError(f"Data source {ds_id} not found") + + if ds.tenant_id != self._scope_to_tenant: + raise ValueError(f"Data source {ds_id} not found") + + async with self._session.begin(): + if name is not None or connection_config is not None: + ds.update_connection( + name=name if name is not None else ds.name, + connection_config=connection_config + if connection_config is not None + else ds.connection_config, + credentials_path=ds.credentials_path, + updated_by=user_id, + ) + + if raw_credentials is not None: + cred_path = f"datasource/{ds.id.value}/credentials" + await self._secret_store.store( + path=cred_path, + tenant_id=self._scope_to_tenant, + credentials=raw_credentials, + ) + ds.credentials_path = cred_path + + await self._ds_repo.save(ds) + + if name is not None: + self._probe.data_source_updated(ds_id=ds_id, name=name) + else: + self._probe.data_source_updated(ds_id=ds_id, name=ds.name) + + return ds + + async def delete( + self, + user_id: str, + ds_id: str, + ) -> bool: + """Delete a data source. + + Args: + user_id: The user performing the deletion + ds_id: The data source ID + + Returns: + True if deleted, False if not found + + Raises: + UnauthorizedError: If user lacks MANAGE permission + """ + has_manage = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.DATA_SOURCE, + resource_id=ds_id, + permission=Permission.MANAGE, + ) + + if not has_manage: + self._probe.permission_denied( + user_id=user_id, + resource_id=ds_id, + permission=Permission.MANAGE, + ) + raise UnauthorizedError( + f"User {user_id} lacks manage permission on data source {ds_id}" + ) + + ds = await self._ds_repo.get_by_id(DataSourceId(value=ds_id)) + if ds is None: + return False + + if ds.tenant_id != self._scope_to_tenant: + return False + + async with self._session.begin(): + if ds.credentials_path: + await self._secret_store.delete( + path=ds.credentials_path, + tenant_id=self._scope_to_tenant, + ) + + ds.mark_for_deletion(deleted_by=user_id) + await self._ds_repo.delete(ds) + + self._probe.data_source_deleted(ds_id=ds_id) + + return True + + async def trigger_sync( + self, + user_id: str, + ds_id: str, + ) -> DataSourceSyncRun: + """Trigger a sync for a data source. + + Args: + user_id: The user triggering the sync + ds_id: The data source ID + + Returns: + The created DataSourceSyncRun entity + + Raises: + UnauthorizedError: If user lacks MANAGE permission + ValueError: If DS not found + """ + has_manage = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.DATA_SOURCE, + resource_id=ds_id, + permission=Permission.MANAGE, + ) + + if not has_manage: + self._probe.permission_denied( + user_id=user_id, + resource_id=ds_id, + permission=Permission.MANAGE, + ) + raise UnauthorizedError( + f"User {user_id} lacks manage permission on data source {ds_id}" + ) + + ds = await self._ds_repo.get_by_id(DataSourceId(value=ds_id)) + if ds is None: + raise ValueError(f"Data source {ds_id} not found") + + if ds.tenant_id != self._scope_to_tenant: + raise ValueError(f"Data source {ds_id} not found") + + now = datetime.now(UTC) + + async with self._session.begin(): + sync_run = DataSourceSyncRun( + id=str(ULID()), + data_source_id=ds.id.value, + status="pending", + started_at=now, + completed_at=None, + error=None, + created_at=now, + ) + await self._sync_run_repo.save(sync_run) + + # Record sync requested event on the data source aggregate + ds.request_sync(requested_by=user_id) + await self._ds_repo.save(ds) + + self._probe.sync_requested(ds_id=ds_id) + + return sync_run diff --git a/src/api/management/application/services/knowledge_graph_service.py b/src/api/management/application/services/knowledge_graph_service.py new file mode 100644 index 00000000..ca187f04 --- /dev/null +++ b/src/api/management/application/services/knowledge_graph_service.py @@ -0,0 +1,381 @@ +"""KnowledgeGraph application service for Management bounded context. + +Orchestrates knowledge graph operations with proper authorization, +transaction management, and observability. +""" + +from __future__ import annotations + +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession + +from management.application.observability import ( + DefaultKnowledgeGraphServiceProbe, + KnowledgeGraphServiceProbe, +) +from management.domain.aggregates import KnowledgeGraph +from management.domain.value_objects import KnowledgeGraphId +from management.ports.exceptions import ( + DuplicateKnowledgeGraphNameError, + UnauthorizedError, +) +from management.ports.repositories import ( + IDataSourceRepository, + IKnowledgeGraphRepository, +) +from shared_kernel.authorization.protocols import AuthorizationProvider +from shared_kernel.authorization.types import ( + Permission, + RelationType, + ResourceType, + format_resource, + format_subject, +) + + +class KnowledgeGraphService: + """Application service for knowledge graph management. + + Orchestrates knowledge graph operations with proper tenant scoping, + authorization checks, and business rule enforcement. + """ + + def __init__( + self, + session: AsyncSession, + knowledge_graph_repository: IKnowledgeGraphRepository, + authz: AuthorizationProvider, + scope_to_tenant: str, + probe: KnowledgeGraphServiceProbe | None = None, + data_source_repository: IDataSourceRepository | None = None, + ) -> None: + """Initialize KnowledgeGraphService with dependencies. + + Args: + session: Database session for transaction management + knowledge_graph_repository: Repository for KG persistence + authz: Authorization provider for permission checks + scope_to_tenant: Tenant ID string to scope this service to + probe: Optional domain probe for observability + data_source_repository: Optional DS repository for cascade delete + """ + self._session = session + self._kg_repo = knowledge_graph_repository + self._authz = authz + self._scope_to_tenant = scope_to_tenant + self._probe = probe or DefaultKnowledgeGraphServiceProbe() + self._ds_repo = data_source_repository + + async def _check_permission( + self, + user_id: str, + resource_type: ResourceType, + resource_id: str, + permission: Permission, + ) -> bool: + """Check if user has permission on a resource. + + Args: + user_id: The user to check + resource_type: Type of resource + resource_id: ID of the resource + permission: The permission to check + + Returns: + True if user has permission, False otherwise + """ + resource = format_resource(resource_type, resource_id) + subject = format_subject(ResourceType.USER, user_id) + return await self._authz.check_permission( + resource=resource, + permission=permission, + subject=subject, + ) + + async def create( + self, + user_id: str, + workspace_id: str, + name: str, + description: str, + ) -> KnowledgeGraph: + """Create a new knowledge graph in a workspace. + + Args: + user_id: The user creating the KG + workspace_id: The workspace to create the KG in + name: Name of the knowledge graph + description: Description of the knowledge graph + + Returns: + The created KnowledgeGraph aggregate + + Raises: + UnauthorizedError: If user lacks EDIT permission on workspace + DuplicateKnowledgeGraphNameError: If name already exists in tenant + """ + has_edit = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.WORKSPACE, + resource_id=workspace_id, + permission=Permission.EDIT, + ) + + if not has_edit: + self._probe.permission_denied( + user_id=user_id, + resource_id=workspace_id, + permission=Permission.EDIT, + ) + raise UnauthorizedError( + f"User {user_id} lacks edit permission on workspace {workspace_id}" + ) + + try: + async with self._session.begin(): + kg = KnowledgeGraph.create( + tenant_id=self._scope_to_tenant, + workspace_id=workspace_id, + name=name, + description=description, + created_by=user_id, + ) + await self._kg_repo.save(kg) + + self._probe.knowledge_graph_created( + kg_id=kg.id.value, + tenant_id=self._scope_to_tenant, + workspace_id=workspace_id, + name=name, + ) + + return kg + + except IntegrityError as e: + self._probe.knowledge_graph_creation_failed( + tenant_id=self._scope_to_tenant, + name=name, + error=str(e), + ) + raise DuplicateKnowledgeGraphNameError( + f"Knowledge graph '{name}' already exists in tenant" + ) from e + + async def get( + self, + user_id: str, + kg_id: str, + ) -> KnowledgeGraph | None: + """Get a knowledge graph by ID with authorization check. + + Args: + user_id: The user requesting access + kg_id: The knowledge graph ID + + Returns: + The KnowledgeGraph aggregate, or None if not found or if + the caller lacks VIEW permission (to avoid existence leakage) + """ + kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) + if kg is None: + return None + + if kg.tenant_id != self._scope_to_tenant: + return None + + has_view = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.KNOWLEDGE_GRAPH, + resource_id=kg_id, + permission=Permission.VIEW, + ) + + if not has_view: + return None + + self._probe.knowledge_graph_retrieved(kg_id=kg_id) + return kg + + async def list_for_workspace( + self, + user_id: str, + workspace_id: str, + ) -> list[KnowledgeGraph]: + """List knowledge graphs in a workspace. + + Uses read_relationships to discover KG IDs linked to the workspace, + then fetches each from the repository and filters by tenant. + + Args: + user_id: The user requesting the list + workspace_id: The workspace to list KGs for + + Returns: + List of KnowledgeGraph aggregates + + Raises: + UnauthorizedError: If user lacks VIEW permission on workspace + """ + has_view = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.WORKSPACE, + resource_id=workspace_id, + permission=Permission.VIEW, + ) + + if not has_view: + self._probe.permission_denied( + user_id=user_id, + resource_id=workspace_id, + permission=Permission.VIEW, + ) + raise UnauthorizedError( + f"User {user_id} lacks view permission on workspace {workspace_id}" + ) + + # Read explicit tuples to discover KG IDs linked to this workspace + tuples = await self._authz.read_relationships( + resource_type=ResourceType.KNOWLEDGE_GRAPH, + relation=RelationType.WORKSPACE, + subject_type=ResourceType.WORKSPACE, + subject_id=workspace_id, + ) + + # Extract KG IDs from relationship tuples + # Format is "knowledge_graph:ID" + kg_ids: list[str] = [] + for rel_tuple in tuples: + parts = rel_tuple.resource.split(":") + if len(parts) == 2: + kg_ids.append(parts[1]) + + # Fetch each KG from repo and filter by tenant + kgs: list[KnowledgeGraph] = [] + for kg_id in kg_ids: + kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) + if kg is not None and kg.tenant_id == self._scope_to_tenant: + kgs.append(kg) + + self._probe.knowledge_graphs_listed( + workspace_id=workspace_id, + count=len(kgs), + ) + + return kgs + + async def update( + self, + user_id: str, + kg_id: str, + name: str, + description: str, + ) -> KnowledgeGraph: + """Update a knowledge graph's metadata. + + Args: + user_id: The user performing the update + kg_id: The knowledge graph ID + name: New name + description: New description + + Returns: + The updated KnowledgeGraph aggregate + + Raises: + UnauthorizedError: If user lacks EDIT permission + ValueError: If KG not found + DuplicateKnowledgeGraphNameError: If name already exists + """ + has_edit = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.KNOWLEDGE_GRAPH, + resource_id=kg_id, + permission=Permission.EDIT, + ) + + if not has_edit: + self._probe.permission_denied( + user_id=user_id, + resource_id=kg_id, + permission=Permission.EDIT, + ) + raise UnauthorizedError( + f"User {user_id} lacks edit permission on knowledge graph {kg_id}" + ) + + kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) + if kg is None: + raise ValueError(f"Knowledge graph {kg_id} not found") + + if kg.tenant_id != self._scope_to_tenant: + raise ValueError(f"Knowledge graph {kg_id} not found") + + kg.update(name=name, description=description, updated_by=user_id) + + try: + async with self._session.begin(): + await self._kg_repo.save(kg) + except IntegrityError as e: + raise DuplicateKnowledgeGraphNameError( + f"Knowledge graph '{name}' already exists in tenant" + ) from e + + self._probe.knowledge_graph_updated(kg_id=kg_id, name=name) + + return kg + + async def delete( + self, + user_id: str, + kg_id: str, + ) -> bool: + """Delete a knowledge graph and cascade delete its data sources. + + Args: + user_id: The user performing the deletion + kg_id: The knowledge graph ID + + Returns: + True if deleted, False if not found + + Raises: + UnauthorizedError: If user lacks MANAGE permission + """ + has_manage = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.KNOWLEDGE_GRAPH, + resource_id=kg_id, + permission=Permission.MANAGE, + ) + + if not has_manage: + self._probe.permission_denied( + user_id=user_id, + resource_id=kg_id, + permission=Permission.MANAGE, + ) + raise UnauthorizedError( + f"User {user_id} lacks manage permission on knowledge graph {kg_id}" + ) + + kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) + if kg is None: + return False + + if kg.tenant_id != self._scope_to_tenant: + return False + + async with self._session.begin(): + # Cascade delete data sources if repo is available + if self._ds_repo is not None: + data_sources = await self._ds_repo.find_by_knowledge_graph(kg_id) + for ds in data_sources: + ds.mark_for_deletion(deleted_by=user_id) + await self._ds_repo.delete(ds) + + kg.mark_for_deletion(deleted_by=user_id) + await self._kg_repo.delete(kg) + + self._probe.knowledge_graph_deleted(kg_id=kg_id) + + return True diff --git a/src/api/management/dependencies/__init__.py b/src/api/management/dependencies/__init__.py new file mode 100644 index 00000000..a4f1148c --- /dev/null +++ b/src/api/management/dependencies/__init__.py @@ -0,0 +1 @@ +"""FastAPI dependency injection for Management bounded context.""" diff --git a/src/api/management/dependencies/data_source.py b/src/api/management/dependencies/data_source.py new file mode 100644 index 00000000..d79271f3 --- /dev/null +++ b/src/api/management/dependencies/data_source.py @@ -0,0 +1,63 @@ +"""FastAPI dependency injection for DataSourceService. + +Provides DataSourceService instances for route handlers +using FastAPI's dependency injection system. +""" + +from typing import Annotated + +from fastapi import Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from iam.application.value_objects import CurrentUser +from iam.dependencies.user import get_current_user +from infrastructure.authorization_dependencies import get_spicedb_client +from infrastructure.database.dependencies import get_write_session +from infrastructure.outbox.repository import OutboxRepository +from infrastructure.settings import get_management_settings +from management.application.observability import DefaultDataSourceServiceProbe +from management.application.services.data_source_service import DataSourceService +from management.infrastructure.repositories import ( + DataSourceRepository, + DataSourceSyncRunRepository, + FernetSecretStore, + KnowledgeGraphRepository, +) +from shared_kernel.authorization.protocols import AuthorizationProvider + + +def get_data_source_service( + session: Annotated[AsyncSession, Depends(get_write_session)], + authz: Annotated[AuthorizationProvider, Depends(get_spicedb_client)], + current_user: Annotated[CurrentUser, Depends(get_current_user)], +) -> DataSourceService: + """Get DataSourceService instance. + + Args: + session: Async database session for transaction management + authz: Authorization provider (SpiceDB client) + current_user: The current user, from which tenant ID is extracted + + Returns: + DataSourceService instance scoped to the current tenant + """ + settings = get_management_settings() + outbox = OutboxRepository(session=session) + ds_repo = DataSourceRepository(session=session, outbox=outbox) + kg_repo = KnowledgeGraphRepository(session=session, outbox=outbox) + encryption_keys = settings.encryption_key.get_secret_value().split(",") + secret_store = FernetSecretStore( + session=session, + encryption_keys=encryption_keys, + ) + sync_run_repo = DataSourceSyncRunRepository(session=session) + return DataSourceService( + session=session, + data_source_repository=ds_repo, + knowledge_graph_repository=kg_repo, + secret_store=secret_store, + sync_run_repository=sync_run_repo, + authz=authz, + scope_to_tenant=current_user.tenant_id.value, + probe=DefaultDataSourceServiceProbe(), + ) diff --git a/src/api/management/dependencies/knowledge_graph.py b/src/api/management/dependencies/knowledge_graph.py new file mode 100644 index 00000000..48c495d9 --- /dev/null +++ b/src/api/management/dependencies/knowledge_graph.py @@ -0,0 +1,53 @@ +"""FastAPI dependency injection for KnowledgeGraphService. + +Provides KnowledgeGraphService instances for route handlers +using FastAPI's dependency injection system. +""" + +from typing import Annotated + +from fastapi import Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from iam.application.value_objects import CurrentUser +from iam.dependencies.user import get_current_user +from infrastructure.authorization_dependencies import get_spicedb_client +from infrastructure.database.dependencies import get_write_session +from infrastructure.outbox.repository import OutboxRepository +from management.application.observability import DefaultKnowledgeGraphServiceProbe +from management.application.services.knowledge_graph_service import ( + KnowledgeGraphService, +) +from management.infrastructure.repositories import ( + DataSourceRepository, + KnowledgeGraphRepository, +) +from shared_kernel.authorization.protocols import AuthorizationProvider + + +def get_knowledge_graph_service( + session: Annotated[AsyncSession, Depends(get_write_session)], + authz: Annotated[AuthorizationProvider, Depends(get_spicedb_client)], + current_user: Annotated[CurrentUser, Depends(get_current_user)], +) -> KnowledgeGraphService: + """Get KnowledgeGraphService instance. + + Args: + session: Async database session for transaction management + authz: Authorization provider (SpiceDB client) + current_user: The current user, from which tenant ID is extracted + + Returns: + KnowledgeGraphService instance scoped to the current tenant + """ + outbox = OutboxRepository(session=session) + kg_repo = KnowledgeGraphRepository(session=session, outbox=outbox) + ds_repo = DataSourceRepository(session=session, outbox=outbox) + return KnowledgeGraphService( + session=session, + knowledge_graph_repository=kg_repo, + data_source_repository=ds_repo, + authz=authz, + scope_to_tenant=current_user.tenant_id.value, + probe=DefaultKnowledgeGraphServiceProbe(), + ) diff --git a/src/api/management/domain/aggregates/data_source.py b/src/api/management/domain/aggregates/data_source.py index bfde0377..eb4400e0 100644 --- a/src/api/management/domain/aggregates/data_source.py +++ b/src/api/management/domain/aggregates/data_source.py @@ -9,6 +9,7 @@ from management.domain.events import ( DataSourceCreated, DataSourceDeleted, + DataSourceSyncRequested, DataSourceUpdated, ) from management.domain.exceptions import ( @@ -211,6 +212,29 @@ def update_connection( name=name, ) + def request_sync(self, *, requested_by: str | None = None) -> None: + """Request a sync for this data source. + + Records a DataSourceSyncRequested event. + + Args: + requested_by: The user who requested the sync (optional) + + Raises: + AggregateDeletedError: If the data source has been marked for deletion + """ + if self._deleted: + raise AggregateDeletedError("Cannot request sync on a deleted data source") + self._pending_events.append( + DataSourceSyncRequested( + data_source_id=self.id.value, + knowledge_graph_id=self.knowledge_graph_id, + tenant_id=self.tenant_id, + occurred_at=datetime.now(UTC), + requested_by=requested_by, + ) + ) + def record_sync_completed(self) -> None: """Record that a sync has completed. diff --git a/src/api/management/ports/exceptions.py b/src/api/management/ports/exceptions.py index a92f67fe..6147c5ff 100644 --- a/src/api/management/ports/exceptions.py +++ b/src/api/management/ports/exceptions.py @@ -26,3 +26,14 @@ class DuplicateDataSourceNameError(Exception): """ pass + + +class UnauthorizedError(Exception): + """Raised when a user lacks permission to perform an operation. + + This exception indicates that authorization checks have failed. + The application layer should handle this and return appropriate + HTTP 403 responses without exposing internal details. + """ + + pass diff --git a/src/api/tests/unit/management/application/__init__.py b/src/api/tests/unit/management/application/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/api/tests/unit/management/application/test_data_source_service.py b/src/api/tests/unit/management/application/test_data_source_service.py new file mode 100644 index 00000000..e1d044f3 --- /dev/null +++ b/src/api/tests/unit/management/application/test_data_source_service.py @@ -0,0 +1,729 @@ +"""Unit tests for DataSourceService. + +Tests verify authorization checks, repository interactions, +credential storage, transaction management, and observability probe calls. +""" + +from __future__ import annotations + +from contextlib import asynccontextmanager +from datetime import UTC, datetime +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from management.application.services.data_source_service import DataSourceService +from management.domain.aggregates import DataSource, KnowledgeGraph +from management.domain.value_objects import ( + DataSourceId, + KnowledgeGraphId, + Schedule, + ScheduleType, +) +from management.ports.exceptions import UnauthorizedError +from shared_kernel.authorization.types import Permission +from shared_kernel.datasource_types import DataSourceAdapterType + + +@pytest.fixture +def mock_session(): + """Create a mock AsyncSession with begin() context manager.""" + session = MagicMock() + + @asynccontextmanager + async def _begin(): + yield + + session.begin = _begin + return session + + +@pytest.fixture +def mock_ds_repo(): + return AsyncMock() + + +@pytest.fixture +def mock_kg_repo(): + return AsyncMock() + + +@pytest.fixture +def mock_secret_store(): + return AsyncMock() + + +@pytest.fixture +def mock_sync_run_repo(): + return AsyncMock() + + +@pytest.fixture +def mock_authz(): + return AsyncMock() + + +@pytest.fixture +def mock_probe(): + return MagicMock() + + +@pytest.fixture +def tenant_id(): + return "tenant-123" + + +@pytest.fixture +def user_id(): + return "user-456" + + +@pytest.fixture +def kg_id(): + return "kg-789" + + +@pytest.fixture +def service( + mock_session, + mock_ds_repo, + mock_kg_repo, + mock_secret_store, + mock_sync_run_repo, + mock_authz, + mock_probe, + tenant_id, +): + return DataSourceService( + session=mock_session, + data_source_repository=mock_ds_repo, + knowledge_graph_repository=mock_kg_repo, + secret_store=mock_secret_store, + sync_run_repository=mock_sync_run_repo, + authz=mock_authz, + scope_to_tenant=tenant_id, + probe=mock_probe, + ) + + +def _make_kg( + kg_id: str = "kg-789", + tenant_id: str = "tenant-123", + workspace_id: str = "ws-001", +) -> KnowledgeGraph: + now = datetime.now(UTC) + kg = KnowledgeGraph( + id=KnowledgeGraphId(value=kg_id), + tenant_id=tenant_id, + workspace_id=workspace_id, + name="Test KG", + description="A test KG", + created_at=now, + updated_at=now, + ) + kg.collect_events() + return kg + + +def _make_ds( + ds_id: str = "ds-001", + kg_id: str = "kg-789", + tenant_id: str = "tenant-123", + name: str = "Test DS", + credentials_path: str | None = None, +) -> DataSource: + now = datetime.now(UTC) + ds = DataSource( + id=DataSourceId(value=ds_id), + knowledge_graph_id=kg_id, + tenant_id=tenant_id, + name=name, + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={"url": "https://github.com"}, + credentials_path=credentials_path, + schedule=Schedule(schedule_type=ScheduleType.MANUAL), + last_sync_at=None, + created_at=now, + updated_at=now, + ) + ds.collect_events() + return ds + + +# ---- create ---- + + +class TestDataSourceServiceCreate: + """Tests for DataSourceService.create.""" + + @pytest.mark.asyncio + async def test_create_checks_edit_permission_on_kg( + self, service, mock_authz, user_id, kg_id, mock_kg_repo, tenant_id + ): + """create() must check EDIT permission on the knowledge graph.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) + + await service.create( + user_id=user_id, + kg_id=kg_id, + name="My DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={"url": "https://github.com"}, + ) + + mock_authz.check_permission.assert_called_once_with( + resource=f"knowledge_graph:{kg_id}", + permission=Permission.EDIT, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_create_raises_unauthorized_when_permission_denied( + self, service, mock_authz, mock_probe, user_id, kg_id + ): + """create() raises UnauthorizedError when user lacks EDIT on KG.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.create( + user_id=user_id, + kg_id=kg_id, + name="My DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={"url": "https://github.com"}, + ) + + mock_probe.permission_denied.assert_called_once() + + @pytest.mark.asyncio + async def test_create_verifies_kg_exists_and_belongs_to_tenant( + self, service, mock_authz, mock_kg_repo, user_id, kg_id + ): + """create() raises ValueError when KG not found.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = None + + with pytest.raises(ValueError, match="not found"): + await service.create( + user_id=user_id, + kg_id=kg_id, + name="My DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={}, + ) + + @pytest.mark.asyncio + async def test_create_rejects_kg_from_different_tenant( + self, service, mock_authz, mock_kg_repo, user_id, kg_id + ): + """create() raises ValueError when KG belongs to different tenant.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg( + kg_id=kg_id, tenant_id="other-tenant" + ) + + with pytest.raises(ValueError, match="different tenant"): + await service.create( + user_id=user_id, + kg_id=kg_id, + name="My DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={}, + ) + + @pytest.mark.asyncio + async def test_create_stores_credentials_when_provided( + self, + service, + mock_authz, + mock_kg_repo, + mock_secret_store, + mock_ds_repo, + user_id, + kg_id, + tenant_id, + ): + """create() stores credentials via secret store when raw_credentials provided.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) + creds = {"token": "abc123"} + + await service.create( + user_id=user_id, + kg_id=kg_id, + name="My DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={"url": "https://github.com"}, + raw_credentials=creds, + ) + + mock_secret_store.store.assert_called_once() + call_kwargs = mock_secret_store.store.call_args.kwargs + assert "datasource/" in call_kwargs.get("path", "") + assert call_kwargs.get("tenant_id") == tenant_id + assert call_kwargs.get("credentials") == creds + + @pytest.mark.asyncio + async def test_create_probes_success( + self, service, mock_authz, mock_kg_repo, mock_probe, user_id, kg_id, tenant_id + ): + """create() calls probe on success.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) + + result = await service.create( + user_id=user_id, + kg_id=kg_id, + name="My DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={}, + ) + + mock_probe.data_source_created.assert_called_once_with( + ds_id=result.id.value, + kg_id=kg_id, + tenant_id=tenant_id, + name="My DS", + ) + + +# ---- get ---- + + +class TestDataSourceServiceGet: + """Tests for DataSourceService.get.""" + + @pytest.mark.asyncio + async def test_get_returns_none_when_not_found( + self, service, mock_ds_repo, user_id + ): + """get() returns None when DS not found.""" + mock_ds_repo.get_by_id.return_value = None + + result = await service.get(user_id=user_id, ds_id="nonexistent") + + assert result is None + + @pytest.mark.asyncio + async def test_get_checks_view_permission( + self, service, mock_authz, mock_ds_repo, user_id + ): + """get() checks VIEW permission on the data source.""" + ds = _make_ds() + mock_ds_repo.get_by_id.return_value = ds + mock_authz.check_permission.return_value = True + + await service.get(user_id=user_id, ds_id=ds.id.value) + + mock_authz.check_permission.assert_called_once_with( + resource=f"data_source:{ds.id.value}", + permission=Permission.VIEW, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_get_returns_none_for_different_tenant( + self, service, mock_ds_repo, user_id + ): + """get() returns None when DS belongs to a different tenant.""" + ds = _make_ds(tenant_id="other-tenant") + mock_ds_repo.get_by_id.return_value = ds + + result = await service.get(user_id=user_id, ds_id=ds.id.value) + + assert result is None + + @pytest.mark.asyncio + async def test_get_returns_none_when_permission_denied( + self, service, mock_authz, mock_ds_repo, user_id + ): + """get() returns None when user lacks VIEW (no existence leakage).""" + ds = _make_ds() + mock_ds_repo.get_by_id.return_value = ds + mock_authz.check_permission.return_value = False + + result = await service.get(user_id=user_id, ds_id=ds.id.value) + + assert result is None + + @pytest.mark.asyncio + async def test_get_returns_aggregate_on_success( + self, service, mock_authz, mock_ds_repo, mock_probe, user_id + ): + """get() returns the aggregate when authorized.""" + ds = _make_ds() + mock_ds_repo.get_by_id.return_value = ds + mock_authz.check_permission.return_value = True + + result = await service.get(user_id=user_id, ds_id=ds.id.value) + + assert result is ds + mock_probe.data_source_retrieved.assert_called_once_with(ds_id=ds.id.value) + + +# ---- list_for_knowledge_graph ---- + + +class TestDataSourceServiceListForKnowledgeGraph: + """Tests for DataSourceService.list_for_knowledge_graph.""" + + @pytest.mark.asyncio + async def test_list_checks_view_permission_on_kg( + self, service, mock_authz, mock_ds_repo, mock_kg_repo, user_id, kg_id, tenant_id + ): + """list_for_knowledge_graph() checks VIEW on the KG.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) + mock_ds_repo.find_by_knowledge_graph.return_value = [] + + await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + + mock_authz.check_permission.assert_called_once_with( + resource=f"knowledge_graph:{kg_id}", + permission=Permission.VIEW, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_list_raises_unauthorized_when_denied( + self, service, mock_authz, user_id, kg_id + ): + """list_for_knowledge_graph() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + + @pytest.mark.asyncio + async def test_list_raises_unauthorized_when_kg_not_found( + self, service, mock_authz, mock_kg_repo, user_id, kg_id + ): + """list_for_knowledge_graph() raises UnauthorizedError when KG not found.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = None + + with pytest.raises(UnauthorizedError, match="not accessible"): + await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + + @pytest.mark.asyncio + async def test_list_raises_unauthorized_for_different_tenant_kg( + self, service, mock_authz, mock_kg_repo, user_id, kg_id + ): + """list_for_knowledge_graph() rejects KG belonging to different tenant.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg( + kg_id=kg_id, tenant_id="other-tenant" + ) + + with pytest.raises(UnauthorizedError, match="not accessible"): + await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + + @pytest.mark.asyncio + async def test_list_returns_data_sources( + self, + service, + mock_authz, + mock_ds_repo, + mock_kg_repo, + mock_probe, + user_id, + kg_id, + tenant_id, + ): + """list_for_knowledge_graph() returns data sources from repo.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) + ds1 = _make_ds(ds_id="ds-001") + ds2 = _make_ds(ds_id="ds-002") + mock_ds_repo.find_by_knowledge_graph.return_value = [ds1, ds2] + + result = await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + + assert len(result) == 2 + mock_probe.data_sources_listed.assert_called_once_with( + kg_id=kg_id, + count=2, + ) + + +# ---- update ---- + + +class TestDataSourceServiceUpdate: + """Tests for DataSourceService.update.""" + + @pytest.mark.asyncio + async def test_update_checks_edit_permission_on_ds( + self, service, mock_authz, mock_ds_repo, user_id + ): + """update() checks EDIT permission on the data source.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + await service.update( + user_id=user_id, + ds_id=ds.id.value, + name="Updated", + connection_config={"url": "https://new.com"}, + ) + + mock_authz.check_permission.assert_called_once_with( + resource=f"data_source:{ds.id.value}", + permission=Permission.EDIT, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_update_raises_unauthorized_when_denied( + self, service, mock_authz, user_id + ): + """update() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.update( + user_id=user_id, + ds_id="ds-001", + name="Updated", + ) + + @pytest.mark.asyncio + async def test_update_raises_value_error_when_not_found( + self, service, mock_authz, mock_ds_repo, user_id + ): + """update() raises ValueError when DS not found.""" + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = None + + with pytest.raises(ValueError): + await service.update( + user_id=user_id, + ds_id="nonexistent", + name="Updated", + ) + + @pytest.mark.asyncio + async def test_update_rejects_different_tenant( + self, service, mock_authz, mock_ds_repo, user_id + ): + """update() raises ValueError when DS belongs to a different tenant.""" + ds = _make_ds(tenant_id="other-tenant") + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + with pytest.raises(ValueError): + await service.update( + user_id=user_id, + ds_id=ds.id.value, + name="Updated", + ) + + @pytest.mark.asyncio + async def test_update_stores_credentials_when_provided( + self, service, mock_authz, mock_ds_repo, mock_secret_store, user_id, tenant_id + ): + """update() stores credentials via secret store when raw_credentials provided.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + creds = {"token": "new-token"} + + await service.update( + user_id=user_id, + ds_id=ds.id.value, + raw_credentials=creds, + ) + + mock_secret_store.store.assert_called_once() + call_kwargs = mock_secret_store.store.call_args.kwargs + assert "datasource/" in call_kwargs.get("path", "") + assert call_kwargs.get("tenant_id") == tenant_id + assert call_kwargs.get("credentials") == creds + + @pytest.mark.asyncio + async def test_update_probes_success( + self, service, mock_authz, mock_ds_repo, mock_probe, user_id + ): + """update() probes success when name is updated.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + await service.update( + user_id=user_id, + ds_id=ds.id.value, + name="Updated", + connection_config={"url": "https://new.com"}, + ) + + mock_probe.data_source_updated.assert_called_once_with( + ds_id=ds.id.value, + name="Updated", + ) + + +# ---- delete ---- + + +class TestDataSourceServiceDelete: + """Tests for DataSourceService.delete.""" + + @pytest.mark.asyncio + async def test_delete_checks_manage_permission_on_ds( + self, service, mock_authz, mock_ds_repo, user_id + ): + """delete() checks MANAGE permission on the data source.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + mock_ds_repo.delete.return_value = True + + await service.delete(user_id=user_id, ds_id=ds.id.value) + + mock_authz.check_permission.assert_called_once_with( + resource=f"data_source:{ds.id.value}", + permission=Permission.MANAGE, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_delete_raises_unauthorized_when_denied( + self, service, mock_authz, user_id + ): + """delete() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.delete(user_id=user_id, ds_id="ds-001") + + @pytest.mark.asyncio + async def test_delete_returns_false_when_not_found( + self, service, mock_authz, mock_ds_repo, user_id + ): + """delete() returns False when DS not found.""" + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = None + + result = await service.delete(user_id=user_id, ds_id="nonexistent") + + assert result is False + + @pytest.mark.asyncio + async def test_delete_returns_false_for_different_tenant( + self, service, mock_authz, mock_ds_repo, user_id + ): + """delete() returns False when DS belongs to a different tenant.""" + ds = _make_ds(tenant_id="other-tenant") + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + result = await service.delete(user_id=user_id, ds_id=ds.id.value) + + assert result is False + + @pytest.mark.asyncio + async def test_delete_removes_credentials_if_path_exists( + self, service, mock_authz, mock_ds_repo, mock_secret_store, user_id, tenant_id + ): + """delete() deletes credentials from secret store if credentials_path is set.""" + ds = _make_ds(credentials_path="datasource/ds-001/credentials") + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + mock_ds_repo.delete.return_value = True + + await service.delete(user_id=user_id, ds_id=ds.id.value) + + mock_secret_store.delete.assert_called_once_with( + path="datasource/ds-001/credentials", + tenant_id=tenant_id, + ) + + @pytest.mark.asyncio + async def test_delete_probes_success( + self, service, mock_authz, mock_ds_repo, mock_probe, user_id + ): + """delete() calls probe on success.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + mock_ds_repo.delete.return_value = True + + await service.delete(user_id=user_id, ds_id=ds.id.value) + + mock_probe.data_source_deleted.assert_called_once_with(ds_id=ds.id.value) + + +# ---- trigger_sync ---- + + +class TestDataSourceServiceTriggerSync: + """Tests for DataSourceService.trigger_sync.""" + + @pytest.mark.asyncio + async def test_trigger_sync_checks_manage_permission( + self, service, mock_authz, mock_ds_repo, mock_sync_run_repo, user_id + ): + """trigger_sync() checks MANAGE permission on the data source.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + await service.trigger_sync(user_id=user_id, ds_id=ds.id.value) + + mock_authz.check_permission.assert_called_once_with( + resource=f"data_source:{ds.id.value}", + permission=Permission.MANAGE, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_trigger_sync_raises_unauthorized_when_denied( + self, service, mock_authz, user_id + ): + """trigger_sync() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.trigger_sync(user_id=user_id, ds_id="ds-001") + + @pytest.mark.asyncio + async def test_trigger_sync_raises_value_error_when_not_found( + self, service, mock_authz, mock_ds_repo, user_id + ): + """trigger_sync() raises ValueError when DS not found.""" + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = None + + with pytest.raises(ValueError): + await service.trigger_sync(user_id=user_id, ds_id="nonexistent") + + @pytest.mark.asyncio + async def test_trigger_sync_rejects_different_tenant( + self, service, mock_authz, mock_ds_repo, user_id + ): + """trigger_sync() raises ValueError when DS belongs to a different tenant.""" + ds = _make_ds(tenant_id="other-tenant") + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + with pytest.raises(ValueError): + await service.trigger_sync(user_id=user_id, ds_id=ds.id.value) + + @pytest.mark.asyncio + async def test_trigger_sync_creates_sync_run_and_saves_ds( + self, service, mock_authz, mock_ds_repo, mock_sync_run_repo, mock_probe, user_id + ): + """trigger_sync() creates a sync run and saves the data source.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + result = await service.trigger_sync(user_id=user_id, ds_id=ds.id.value) + + assert result.data_source_id == ds.id.value + assert result.status == "pending" + mock_sync_run_repo.save.assert_called_once() + mock_ds_repo.save.assert_called_once() + mock_probe.sync_requested.assert_called_once_with(ds_id=ds.id.value) diff --git a/src/api/tests/unit/management/application/test_knowledge_graph_service.py b/src/api/tests/unit/management/application/test_knowledge_graph_service.py new file mode 100644 index 00000000..44e0923f --- /dev/null +++ b/src/api/tests/unit/management/application/test_knowledge_graph_service.py @@ -0,0 +1,624 @@ +"""Unit tests for KnowledgeGraphService. + +Tests verify authorization checks, repository interactions, +transaction management, and observability probe calls. +""" + +from __future__ import annotations + +from contextlib import asynccontextmanager +from datetime import UTC, datetime +from unittest.mock import AsyncMock, MagicMock + +import pytest +from sqlalchemy.exc import IntegrityError + +from management.application.services.knowledge_graph_service import ( + KnowledgeGraphService, +) +from management.domain.aggregates import KnowledgeGraph +from management.domain.value_objects import KnowledgeGraphId +from management.ports.exceptions import ( + DuplicateKnowledgeGraphNameError, + UnauthorizedError, +) +from shared_kernel.authorization.types import ( + Permission, + RelationshipTuple, +) + + +@pytest.fixture +def mock_session(): + """Create a mock AsyncSession with begin() context manager.""" + session = MagicMock() + + @asynccontextmanager + async def _begin(): + yield + + session.begin = _begin + return session + + +@pytest.fixture +def mock_kg_repo(): + """Create a mock IKnowledgeGraphRepository.""" + return AsyncMock() + + +@pytest.fixture +def mock_ds_repo(): + """Create a mock IDataSourceRepository.""" + return AsyncMock() + + +@pytest.fixture +def mock_authz(): + """Create a mock AuthorizationProvider.""" + return AsyncMock() + + +@pytest.fixture +def mock_probe(): + """Create a mock KnowledgeGraphServiceProbe.""" + return MagicMock() + + +@pytest.fixture +def tenant_id(): + return "tenant-123" + + +@pytest.fixture +def user_id(): + return "user-456" + + +@pytest.fixture +def workspace_id(): + return "workspace-789" + + +@pytest.fixture +def service( + mock_session, mock_kg_repo, mock_ds_repo, mock_authz, mock_probe, tenant_id +): + """Create a KnowledgeGraphService with mocked dependencies.""" + return KnowledgeGraphService( + session=mock_session, + knowledge_graph_repository=mock_kg_repo, + data_source_repository=mock_ds_repo, + authz=mock_authz, + scope_to_tenant=tenant_id, + probe=mock_probe, + ) + + +def _make_kg( + kg_id: str = "kg-001", + tenant_id: str = "tenant-123", + workspace_id: str = "workspace-789", + name: str = "Test KG", + description: str = "A test knowledge graph", +) -> KnowledgeGraph: + """Create a KnowledgeGraph instance for testing.""" + now = datetime.now(UTC) + kg = KnowledgeGraph( + id=KnowledgeGraphId(value=kg_id), + tenant_id=tenant_id, + workspace_id=workspace_id, + name=name, + description=description, + created_at=now, + updated_at=now, + ) + # Clear events from construction + kg.collect_events() + return kg + + +# ---- create ---- + + +class TestKnowledgeGraphServiceCreate: + """Tests for KnowledgeGraphService.create.""" + + @pytest.mark.asyncio + async def test_create_checks_edit_permission_on_workspace( + self, service, mock_authz, user_id, workspace_id + ): + """create() must check EDIT permission on the workspace.""" + mock_authz.check_permission.return_value = True + + await service.create( + user_id=user_id, + workspace_id=workspace_id, + name="My KG", + description="desc", + ) + + mock_authz.check_permission.assert_called_once_with( + resource=f"workspace:{workspace_id}", + permission=Permission.EDIT, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_create_raises_unauthorized_when_permission_denied( + self, service, mock_authz, mock_probe, user_id, workspace_id + ): + """create() raises UnauthorizedError when user lacks EDIT on workspace.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.create( + user_id=user_id, + workspace_id=workspace_id, + name="My KG", + description="desc", + ) + + mock_probe.permission_denied.assert_called_once_with( + user_id=user_id, + resource_id=workspace_id, + permission=Permission.EDIT, + ) + + @pytest.mark.asyncio + async def test_create_saves_aggregate_via_repo( + self, service, mock_authz, mock_kg_repo, user_id, workspace_id, tenant_id + ): + """create() saves the KnowledgeGraph aggregate through the repository.""" + mock_authz.check_permission.return_value = True + + result = await service.create( + user_id=user_id, + workspace_id=workspace_id, + name="My KG", + description="desc", + ) + + assert result.name == "My KG" + assert result.description == "desc" + assert result.tenant_id == tenant_id + assert result.workspace_id == workspace_id + mock_kg_repo.save.assert_called_once() + + @pytest.mark.asyncio + async def test_create_probes_success( + self, service, mock_authz, mock_probe, user_id, workspace_id, tenant_id + ): + """create() calls probe on success.""" + mock_authz.check_permission.return_value = True + + result = await service.create( + user_id=user_id, + workspace_id=workspace_id, + name="My KG", + description="desc", + ) + + mock_probe.knowledge_graph_created.assert_called_once_with( + kg_id=result.id.value, + tenant_id=tenant_id, + workspace_id=workspace_id, + name="My KG", + ) + + @pytest.mark.asyncio + async def test_create_raises_duplicate_on_integrity_error( + self, service, mock_authz, mock_kg_repo, user_id, workspace_id + ): + """create() catches IntegrityError and raises DuplicateKnowledgeGraphNameError.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.save.side_effect = IntegrityError( + "INSERT", {}, Exception("uq_knowledge_graphs_tenant_name") + ) + + with pytest.raises(DuplicateKnowledgeGraphNameError): + await service.create( + user_id=user_id, + workspace_id=workspace_id, + name="Duplicate", + description="desc", + ) + + +# ---- get ---- + + +class TestKnowledgeGraphServiceGet: + """Tests for KnowledgeGraphService.get.""" + + @pytest.mark.asyncio + async def test_get_returns_none_when_not_found( + self, service, mock_kg_repo, user_id + ): + """get() returns None when KG is not found.""" + mock_kg_repo.get_by_id.return_value = None + + result = await service.get(user_id=user_id, kg_id="nonexistent") + + assert result is None + + @pytest.mark.asyncio + async def test_get_checks_view_permission( + self, service, mock_authz, mock_kg_repo, user_id + ): + """get() checks VIEW permission on the knowledge graph.""" + kg = _make_kg() + mock_kg_repo.get_by_id.return_value = kg + mock_authz.check_permission.return_value = True + + await service.get(user_id=user_id, kg_id=kg.id.value) + + mock_authz.check_permission.assert_called_once_with( + resource=f"knowledge_graph:{kg.id.value}", + permission=Permission.VIEW, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_get_returns_none_for_different_tenant( + self, service, mock_kg_repo, user_id + ): + """get() returns None when KG belongs to a different tenant.""" + kg = _make_kg(tenant_id="other-tenant") + mock_kg_repo.get_by_id.return_value = kg + + result = await service.get(user_id=user_id, kg_id=kg.id.value) + + assert result is None + + @pytest.mark.asyncio + async def test_get_returns_none_when_permission_denied( + self, service, mock_authz, mock_kg_repo, user_id + ): + """get() returns None when user lacks VIEW (no existence leakage).""" + kg = _make_kg() + mock_kg_repo.get_by_id.return_value = kg + mock_authz.check_permission.return_value = False + + result = await service.get(user_id=user_id, kg_id=kg.id.value) + + assert result is None + + @pytest.mark.asyncio + async def test_get_returns_aggregate_on_success( + self, service, mock_authz, mock_kg_repo, mock_probe, user_id + ): + """get() returns the aggregate when authorized.""" + kg = _make_kg() + mock_kg_repo.get_by_id.return_value = kg + mock_authz.check_permission.return_value = True + + result = await service.get(user_id=user_id, kg_id=kg.id.value) + + assert result is kg + mock_probe.knowledge_graph_retrieved.assert_called_once_with( + kg_id=kg.id.value, + ) + + +# ---- list_for_workspace ---- + + +class TestKnowledgeGraphServiceListForWorkspace: + """Tests for KnowledgeGraphService.list_for_workspace.""" + + @pytest.mark.asyncio + async def test_list_checks_view_permission_on_workspace( + self, service, mock_authz, user_id, workspace_id + ): + """list_for_workspace() checks VIEW on the workspace.""" + mock_authz.check_permission.return_value = True + mock_authz.read_relationships.return_value = [] + + await service.list_for_workspace(user_id=user_id, workspace_id=workspace_id) + + mock_authz.check_permission.assert_called_once_with( + resource=f"workspace:{workspace_id}", + permission=Permission.VIEW, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_list_raises_unauthorized_when_permission_denied( + self, service, mock_authz, user_id, workspace_id + ): + """list_for_workspace() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.list_for_workspace(user_id=user_id, workspace_id=workspace_id) + + @pytest.mark.asyncio + async def test_list_uses_read_relationships_to_discover_kgs( + self, + service, + mock_authz, + mock_kg_repo, + mock_probe, + user_id, + workspace_id, + tenant_id, + ): + """list_for_workspace() reads relationships to find KG IDs.""" + mock_authz.check_permission.return_value = True + kg1 = _make_kg(kg_id="kg-001", tenant_id=tenant_id) + kg2 = _make_kg(kg_id="kg-002", tenant_id=tenant_id) + mock_authz.read_relationships.return_value = [ + RelationshipTuple( + resource="knowledge_graph:kg-001", + relation="workspace", + subject=f"workspace:{workspace_id}", + ), + RelationshipTuple( + resource="knowledge_graph:kg-002", + relation="workspace", + subject=f"workspace:{workspace_id}", + ), + ] + mock_kg_repo.get_by_id.side_effect = [kg1, kg2] + + result = await service.list_for_workspace( + user_id=user_id, workspace_id=workspace_id + ) + + assert len(result) == 2 + mock_probe.knowledge_graphs_listed.assert_called_once_with( + workspace_id=workspace_id, + count=2, + ) + + @pytest.mark.asyncio + async def test_list_filters_by_tenant( + self, service, mock_authz, mock_kg_repo, user_id, workspace_id, tenant_id + ): + """list_for_workspace() filters KGs that don't belong to the scoped tenant.""" + mock_authz.check_permission.return_value = True + kg_own = _make_kg(kg_id="kg-001", tenant_id=tenant_id) + kg_other = _make_kg(kg_id="kg-002", tenant_id="other-tenant") + mock_authz.read_relationships.return_value = [ + RelationshipTuple( + resource="knowledge_graph:kg-001", + relation="workspace", + subject=f"workspace:{workspace_id}", + ), + RelationshipTuple( + resource="knowledge_graph:kg-002", + relation="workspace", + subject=f"workspace:{workspace_id}", + ), + ] + mock_kg_repo.get_by_id.side_effect = [kg_own, kg_other] + + result = await service.list_for_workspace( + user_id=user_id, workspace_id=workspace_id + ) + + assert len(result) == 1 + assert result[0].id.value == "kg-001" + + +# ---- update ---- + + +class TestKnowledgeGraphServiceUpdate: + """Tests for KnowledgeGraphService.update.""" + + @pytest.mark.asyncio + async def test_update_checks_edit_permission_on_kg( + self, service, mock_authz, mock_kg_repo, user_id + ): + """update() checks EDIT permission on the knowledge graph.""" + kg = _make_kg() + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + + await service.update( + user_id=user_id, + kg_id=kg.id.value, + name="Updated", + description="Updated desc", + ) + + mock_authz.check_permission.assert_called_once_with( + resource=f"knowledge_graph:{kg.id.value}", + permission=Permission.EDIT, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_update_raises_unauthorized_when_permission_denied( + self, service, mock_authz, user_id + ): + """update() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.update( + user_id=user_id, + kg_id="kg-001", + name="Updated", + description="Updated desc", + ) + + @pytest.mark.asyncio + async def test_update_raises_value_error_when_not_found( + self, service, mock_authz, mock_kg_repo, user_id + ): + """update() raises ValueError when KG not found.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = None + + with pytest.raises(ValueError): + await service.update( + user_id=user_id, + kg_id="nonexistent", + name="Updated", + description="Updated desc", + ) + + @pytest.mark.asyncio + async def test_update_rejects_different_tenant( + self, service, mock_authz, mock_kg_repo, user_id + ): + """update() raises ValueError when KG belongs to a different tenant.""" + kg = _make_kg(tenant_id="other-tenant") + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + + with pytest.raises(ValueError): + await service.update( + user_id=user_id, + kg_id=kg.id.value, + name="Updated", + description="Updated desc", + ) + + @pytest.mark.asyncio + async def test_update_calls_aggregate_update_and_saves( + self, service, mock_authz, mock_kg_repo, mock_probe, user_id + ): + """update() calls kg.update() and saves via repo.""" + kg = _make_kg() + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + + result = await service.update( + user_id=user_id, + kg_id=kg.id.value, + name="Updated", + description="New desc", + ) + + assert result.name == "Updated" + assert result.description == "New desc" + mock_kg_repo.save.assert_called_once_with(kg) + mock_probe.knowledge_graph_updated.assert_called_once_with( + kg_id=kg.id.value, + name="Updated", + ) + + @pytest.mark.asyncio + async def test_update_raises_duplicate_on_integrity_error( + self, service, mock_authz, mock_kg_repo, user_id + ): + """update() catches IntegrityError and raises DuplicateKnowledgeGraphNameError.""" + kg = _make_kg() + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + mock_kg_repo.save.side_effect = IntegrityError( + "UPDATE", {}, Exception("uq_knowledge_graphs_tenant_name") + ) + + with pytest.raises(DuplicateKnowledgeGraphNameError): + await service.update( + user_id=user_id, + kg_id=kg.id.value, + name="Duplicate", + description="desc", + ) + + +# ---- delete ---- + + +class TestKnowledgeGraphServiceDelete: + """Tests for KnowledgeGraphService.delete.""" + + @pytest.mark.asyncio + async def test_delete_checks_manage_permission_on_kg( + self, service, mock_authz, mock_kg_repo, mock_ds_repo, user_id + ): + """delete() checks MANAGE permission on the knowledge graph.""" + kg = _make_kg() + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + mock_ds_repo.find_by_knowledge_graph.return_value = [] + mock_kg_repo.delete.return_value = True + + await service.delete(user_id=user_id, kg_id=kg.id.value) + + mock_authz.check_permission.assert_called_once_with( + resource=f"knowledge_graph:{kg.id.value}", + permission=Permission.MANAGE, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_delete_raises_unauthorized_when_permission_denied( + self, service, mock_authz, user_id + ): + """delete() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.delete(user_id=user_id, kg_id="kg-001") + + @pytest.mark.asyncio + async def test_delete_returns_false_when_not_found( + self, service, mock_authz, mock_kg_repo, mock_ds_repo, user_id + ): + """delete() returns False when KG not found.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = None + + result = await service.delete(user_id=user_id, kg_id="nonexistent") + + assert result is False + + @pytest.mark.asyncio + async def test_delete_returns_false_for_different_tenant( + self, service, mock_authz, mock_kg_repo, user_id + ): + """delete() returns False when KG belongs to a different tenant.""" + kg = _make_kg(tenant_id="other-tenant") + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + + result = await service.delete(user_id=user_id, kg_id=kg.id.value) + + assert result is False + + @pytest.mark.asyncio + async def test_delete_cascades_data_sources( + self, service, mock_authz, mock_kg_repo, mock_ds_repo, user_id, tenant_id + ): + """delete() deletes all data sources before deleting the KG.""" + kg = _make_kg(tenant_id=tenant_id) + ds1 = MagicMock() + ds2 = MagicMock() + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + mock_ds_repo.find_by_knowledge_graph.return_value = [ds1, ds2] + mock_ds_repo.delete.return_value = True + mock_kg_repo.delete.return_value = True + + result = await service.delete(user_id=user_id, kg_id=kg.id.value) + + assert result is True + # Each DS should be marked for deletion and deleted + ds1.mark_for_deletion.assert_called_once() + ds2.mark_for_deletion.assert_called_once() + assert mock_ds_repo.delete.call_count == 2 + mock_kg_repo.delete.assert_called_once_with(kg) + + @pytest.mark.asyncio + async def test_delete_probes_success( + self, service, mock_authz, mock_kg_repo, mock_ds_repo, mock_probe, user_id + ): + """delete() calls probe on success.""" + kg = _make_kg() + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + mock_ds_repo.find_by_knowledge_graph.return_value = [] + mock_kg_repo.delete.return_value = True + + await service.delete(user_id=user_id, kg_id=kg.id.value) + + mock_probe.knowledge_graph_deleted.assert_called_once_with( + kg_id=kg.id.value, + ) diff --git a/src/api/tests/unit/management/test_architecture.py b/src/api/tests/unit/management/test_architecture.py index bd1b2489..dd6addae 100644 --- a/src/api/tests/unit/management/test_architecture.py +++ b/src/api/tests/unit/management/test_architecture.py @@ -226,10 +226,15 @@ def test_management_does_not_import_iam(self): IAM manages authentication and authorization. Management should not couple to IAM's user, tenant, or API key domain objects. + + The management.dependencies package is excluded because DI wiring + is a presentation-layer concern that legitimately crosses context + boundaries (e.g., extracting CurrentUser from IAM for tenant scoping). """ ( archrule("management_no_iam") .match("management*") + .exclude("management.dependencies*") .should_not_import("iam*") .check("management") ) diff --git a/src/api/tests/unit/management/test_data_source.py b/src/api/tests/unit/management/test_data_source.py index e56ba6f2..034702d9 100644 --- a/src/api/tests/unit/management/test_data_source.py +++ b/src/api/tests/unit/management/test_data_source.py @@ -11,6 +11,7 @@ from management.domain.events import ( DataSourceCreated, DataSourceDeleted, + DataSourceSyncRequested, DataSourceUpdated, ) from management.domain.exceptions import ( @@ -301,6 +302,53 @@ def test_update_connection_raises_after_deletion(self): ) +class TestDataSourceRequestSync: + """Tests for DataSource.request_sync() method.""" + + def _create_ds(self, **kwargs): + """Helper to create a DataSource and clear creation events.""" + defaults = { + "knowledge_graph_id": "kg-123", + "tenant_id": "tenant-456", + "name": "Source", + "adapter_type": DataSourceAdapterType.GITHUB, + "connection_config": {}, + } + defaults.update(kwargs) + ds = DataSource.create(**defaults) + ds.collect_events() + return ds + + def test_request_sync_emits_sync_requested_event(self): + """request_sync() should emit a DataSourceSyncRequested event.""" + ds = self._create_ds() + ds.request_sync(requested_by="user-abc") + events = ds.collect_events() + assert len(events) == 1 + event = events[0] + assert isinstance(event, DataSourceSyncRequested) + assert event.data_source_id == ds.id.value + assert event.knowledge_graph_id == "kg-123" + assert event.tenant_id == "tenant-456" + assert event.requested_by == "user-abc" + assert event.occurred_at is not None + + def test_request_sync_without_actor(self): + """request_sync() without requested_by should set it to None.""" + ds = self._create_ds() + ds.request_sync() + events = ds.collect_events() + assert events[0].requested_by is None + + def test_request_sync_raises_after_deletion(self): + """request_sync() should raise AggregateDeletedError after mark_for_deletion().""" + ds = self._create_ds() + ds.mark_for_deletion() + ds.collect_events() + with pytest.raises(AggregateDeletedError): + ds.request_sync() + + class TestDataSourceRecordSyncCompleted: """Tests for DataSource.record_sync_completed() method.""" diff --git a/src/api/uv.lock b/src/api/uv.lock index d9a98e23..b085bb5d 100644 --- a/src/api/uv.lock +++ b/src/api/uv.lock @@ -1171,7 +1171,7 @@ wheels = [ [[package]] name = "kartograph-api" -version = "3.29.0" +version = "3.30.0" source = { virtual = "." } dependencies = [ { name = "alembic" },