From 28315c5bcb21c25124b4c7c1930d3d13385d2858 Mon Sep 17 00:00:00 2001 From: John Sell Date: Tue, 17 Mar 2026 16:39:48 -0400 Subject: [PATCH 01/10] feat(management): add application service observability probes Add KnowledgeGraphServiceProbe and DataSourceServiceProbe following the Domain-Oriented Observability pattern established in IAM. Each includes a Protocol interface and DefaultXxxProbe implementation using structlog with _get_context_kwargs for kwarg collision avoidance. Co-Authored-By: Claude Opus 4.6 --- src/api/management/application/__init__.py | 5 + .../application/observability/__init__.py | 20 ++ .../data_source_service_probe.py | 265 ++++++++++++++++++ .../knowledge_graph_service_probe.py | 248 ++++++++++++++++ 4 files changed, 538 insertions(+) create mode 100644 src/api/management/application/__init__.py create mode 100644 src/api/management/application/observability/__init__.py create mode 100644 src/api/management/application/observability/data_source_service_probe.py create mode 100644 src/api/management/application/observability/knowledge_graph_service_probe.py diff --git a/src/api/management/application/__init__.py b/src/api/management/application/__init__.py new file mode 100644 index 00000000..17c235ea --- /dev/null +++ b/src/api/management/application/__init__.py @@ -0,0 +1,5 @@ +"""Management application layer. + +Contains application services that orchestrate domain operations +with authorization, transaction management, and observability. +""" diff --git a/src/api/management/application/observability/__init__.py b/src/api/management/application/observability/__init__.py new file mode 100644 index 00000000..514398f9 --- /dev/null +++ b/src/api/management/application/observability/__init__.py @@ -0,0 +1,20 @@ +"""Domain-Oriented Observability for Management application layer. + +Probes for application service operations following Domain-Oriented Observability patterns. +""" + +from management.application.observability.data_source_service_probe import ( + DataSourceServiceProbe, + DefaultDataSourceServiceProbe, +) +from management.application.observability.knowledge_graph_service_probe import ( + DefaultKnowledgeGraphServiceProbe, + KnowledgeGraphServiceProbe, +) + +__all__ = [ + "DataSourceServiceProbe", + "DefaultDataSourceServiceProbe", + "KnowledgeGraphServiceProbe", + "DefaultKnowledgeGraphServiceProbe", +] diff --git a/src/api/management/application/observability/data_source_service_probe.py b/src/api/management/application/observability/data_source_service_probe.py new file mode 100644 index 00000000..86d779f2 --- /dev/null +++ b/src/api/management/application/observability/data_source_service_probe.py @@ -0,0 +1,265 @@ +"""Protocol for data source application service observability. + +Defines the interface for domain probes that capture application-level +domain events for data source service operations. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Protocol + +import structlog + +if TYPE_CHECKING: + from shared_kernel.observability_context import ObservationContext + + +class DataSourceServiceProbe(Protocol): + """Domain probe for data source application service operations. + + Records domain-significant events related to data source operations. + """ + + def data_source_created( + self, + ds_id: str, + kg_id: str, + tenant_id: str, + name: str, + ) -> None: + """Record data source creation.""" + ... + + def data_source_creation_failed( + self, + kg_id: str, + name: str, + error: str, + ) -> None: + """Record failed data source creation.""" + ... + + def data_source_retrieved( + self, + ds_id: str, + ) -> None: + """Record data source retrieval.""" + ... + + def data_source_updated( + self, + ds_id: str, + name: str, + ) -> None: + """Record data source update.""" + ... + + def data_source_deleted( + self, + ds_id: str, + ) -> None: + """Record data source deletion.""" + ... + + def data_source_deletion_failed( + self, + ds_id: str, + error: str, + ) -> None: + """Record failed data source deletion.""" + ... + + def data_sources_listed( + self, + kg_id: str, + count: int, + ) -> None: + """Record data sources listed.""" + ... + + def sync_requested( + self, + ds_id: str, + ) -> None: + """Record sync requested.""" + ... + + def permission_denied( + self, + user_id: str, + resource_id: str, + permission: str, + ) -> None: + """Record permission denied.""" + ... + + def with_context(self, context: ObservationContext) -> DataSourceServiceProbe: + """Return a new probe with additional context.""" + ... + + +class DefaultDataSourceServiceProbe: + """Default implementation of DataSourceServiceProbe using structlog.""" + + def __init__( + self, + logger: structlog.stdlib.BoundLogger | None = None, + context: ObservationContext | None = None, + ): + self._logger = logger or structlog.get_logger() + self._context = context + + def _get_context_kwargs(self, exclude: set[str] | None = None) -> dict[str, Any]: + """Get context as kwargs dict, excluding specified keys. + + Args: + exclude: Set of keys to exclude from context (avoids parameter collision) + + Returns: + Context dict with excluded keys filtered out + """ + if self._context is None: + return {} + + context_dict = self._context.as_dict() + if exclude: + return {k: v for k, v in context_dict.items() if k not in exclude} + return context_dict + + def with_context( + self, context: ObservationContext + ) -> DefaultDataSourceServiceProbe: + """Create a new probe with observation context bound.""" + return DefaultDataSourceServiceProbe(logger=self._logger, context=context) + + def data_source_created( + self, + ds_id: str, + kg_id: str, + tenant_id: str, + name: str, + ) -> None: + """Record data source creation.""" + context_kwargs = self._get_context_kwargs( + exclude={"ds_id", "kg_id", "tenant_id", "name"} + ) + self._logger.info( + "data_source_created", + ds_id=ds_id, + kg_id=kg_id, + tenant_id=tenant_id, + name=name, + **context_kwargs, + ) + + def data_source_creation_failed( + self, + kg_id: str, + name: str, + error: str, + ) -> None: + """Record failed data source creation.""" + context_kwargs = self._get_context_kwargs(exclude={"kg_id", "name", "error"}) + self._logger.error( + "data_source_creation_failed", + kg_id=kg_id, + name=name, + error=error, + **context_kwargs, + ) + + def data_source_retrieved( + self, + ds_id: str, + ) -> None: + """Record data source retrieval.""" + context_kwargs = self._get_context_kwargs(exclude={"ds_id"}) + self._logger.debug( + "data_source_retrieved", + ds_id=ds_id, + **context_kwargs, + ) + + def data_source_updated( + self, + ds_id: str, + name: str, + ) -> None: + """Record data source update.""" + context_kwargs = self._get_context_kwargs(exclude={"ds_id", "name"}) + self._logger.info( + "data_source_updated", + ds_id=ds_id, + name=name, + **context_kwargs, + ) + + def data_source_deleted( + self, + ds_id: str, + ) -> None: + """Record data source deletion.""" + context_kwargs = self._get_context_kwargs(exclude={"ds_id"}) + self._logger.info( + "data_source_deleted", + ds_id=ds_id, + **context_kwargs, + ) + + def data_source_deletion_failed( + self, + ds_id: str, + error: str, + ) -> None: + """Record failed data source deletion.""" + context_kwargs = self._get_context_kwargs(exclude={"ds_id", "error"}) + self._logger.error( + "data_source_deletion_failed", + ds_id=ds_id, + error=error, + **context_kwargs, + ) + + def data_sources_listed( + self, + kg_id: str, + count: int, + ) -> None: + """Record data sources listed.""" + context_kwargs = self._get_context_kwargs(exclude={"kg_id", "count"}) + self._logger.debug( + "data_sources_listed", + kg_id=kg_id, + count=count, + **context_kwargs, + ) + + def sync_requested( + self, + ds_id: str, + ) -> None: + """Record sync requested.""" + context_kwargs = self._get_context_kwargs(exclude={"ds_id"}) + self._logger.info( + "data_source_sync_requested", + ds_id=ds_id, + **context_kwargs, + ) + + def permission_denied( + self, + user_id: str, + resource_id: str, + permission: str, + ) -> None: + """Record permission denied.""" + context_kwargs = self._get_context_kwargs( + exclude={"user_id", "resource_id", "permission"} + ) + self._logger.warning( + "data_source_permission_denied", + user_id=user_id, + resource_id=resource_id, + permission=permission, + **context_kwargs, + ) diff --git a/src/api/management/application/observability/knowledge_graph_service_probe.py b/src/api/management/application/observability/knowledge_graph_service_probe.py new file mode 100644 index 00000000..3bd7b78d --- /dev/null +++ b/src/api/management/application/observability/knowledge_graph_service_probe.py @@ -0,0 +1,248 @@ +"""Protocol for knowledge graph application service observability. + +Defines the interface for domain probes that capture application-level +domain events for knowledge graph service operations. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Protocol + +import structlog + +if TYPE_CHECKING: + from shared_kernel.observability_context import ObservationContext + + +class KnowledgeGraphServiceProbe(Protocol): + """Domain probe for knowledge graph application service operations. + + Records domain-significant events related to knowledge graph operations. + """ + + def knowledge_graph_created( + self, + kg_id: str, + tenant_id: str, + workspace_id: str, + name: str, + ) -> None: + """Record knowledge graph creation.""" + ... + + def knowledge_graph_creation_failed( + self, + tenant_id: str, + name: str, + error: str, + ) -> None: + """Record failed knowledge graph creation.""" + ... + + def knowledge_graph_retrieved( + self, + kg_id: str, + ) -> None: + """Record knowledge graph retrieval.""" + ... + + def knowledge_graph_updated( + self, + kg_id: str, + name: str, + ) -> None: + """Record knowledge graph update.""" + ... + + def knowledge_graph_deleted( + self, + kg_id: str, + ) -> None: + """Record knowledge graph deletion.""" + ... + + def knowledge_graph_deletion_failed( + self, + kg_id: str, + error: str, + ) -> None: + """Record failed knowledge graph deletion.""" + ... + + def knowledge_graphs_listed( + self, + workspace_id: str, + count: int, + ) -> None: + """Record knowledge graphs listed.""" + ... + + def permission_denied( + self, + user_id: str, + resource_id: str, + permission: str, + ) -> None: + """Record permission denied.""" + ... + + def with_context(self, context: ObservationContext) -> KnowledgeGraphServiceProbe: + """Return a new probe with additional context.""" + ... + + +class DefaultKnowledgeGraphServiceProbe: + """Default implementation of KnowledgeGraphServiceProbe using structlog.""" + + def __init__( + self, + logger: structlog.stdlib.BoundLogger | None = None, + context: ObservationContext | None = None, + ): + self._logger = logger or structlog.get_logger() + self._context = context + + def _get_context_kwargs(self, exclude: set[str] | None = None) -> dict[str, Any]: + """Get context as kwargs dict, excluding specified keys. + + Args: + exclude: Set of keys to exclude from context (avoids parameter collision) + + Returns: + Context dict with excluded keys filtered out + """ + if self._context is None: + return {} + + context_dict = self._context.as_dict() + if exclude: + return {k: v for k, v in context_dict.items() if k not in exclude} + return context_dict + + def with_context( + self, context: ObservationContext + ) -> DefaultKnowledgeGraphServiceProbe: + """Create a new probe with observation context bound.""" + return DefaultKnowledgeGraphServiceProbe(logger=self._logger, context=context) + + def knowledge_graph_created( + self, + kg_id: str, + tenant_id: str, + workspace_id: str, + name: str, + ) -> None: + """Record knowledge graph creation.""" + context_kwargs = self._get_context_kwargs( + exclude={"kg_id", "tenant_id", "workspace_id", "name"} + ) + self._logger.info( + "knowledge_graph_created", + kg_id=kg_id, + tenant_id=tenant_id, + workspace_id=workspace_id, + name=name, + **context_kwargs, + ) + + def knowledge_graph_creation_failed( + self, + tenant_id: str, + name: str, + error: str, + ) -> None: + """Record failed knowledge graph creation.""" + context_kwargs = self._get_context_kwargs( + exclude={"tenant_id", "name", "error"} + ) + self._logger.error( + "knowledge_graph_creation_failed", + tenant_id=tenant_id, + name=name, + error=error, + **context_kwargs, + ) + + def knowledge_graph_retrieved( + self, + kg_id: str, + ) -> None: + """Record knowledge graph retrieval.""" + context_kwargs = self._get_context_kwargs(exclude={"kg_id"}) + self._logger.debug( + "knowledge_graph_retrieved", + kg_id=kg_id, + **context_kwargs, + ) + + def knowledge_graph_updated( + self, + kg_id: str, + name: str, + ) -> None: + """Record knowledge graph update.""" + context_kwargs = self._get_context_kwargs(exclude={"kg_id", "name"}) + self._logger.info( + "knowledge_graph_updated", + kg_id=kg_id, + name=name, + **context_kwargs, + ) + + def knowledge_graph_deleted( + self, + kg_id: str, + ) -> None: + """Record knowledge graph deletion.""" + context_kwargs = self._get_context_kwargs(exclude={"kg_id"}) + self._logger.info( + "knowledge_graph_deleted", + kg_id=kg_id, + **context_kwargs, + ) + + def knowledge_graph_deletion_failed( + self, + kg_id: str, + error: str, + ) -> None: + """Record failed knowledge graph deletion.""" + context_kwargs = self._get_context_kwargs(exclude={"kg_id", "error"}) + self._logger.error( + "knowledge_graph_deletion_failed", + kg_id=kg_id, + error=error, + **context_kwargs, + ) + + def knowledge_graphs_listed( + self, + workspace_id: str, + count: int, + ) -> None: + """Record knowledge graphs listed.""" + context_kwargs = self._get_context_kwargs(exclude={"workspace_id", "count"}) + self._logger.debug( + "knowledge_graphs_listed", + workspace_id=workspace_id, + count=count, + **context_kwargs, + ) + + def permission_denied( + self, + user_id: str, + resource_id: str, + permission: str, + ) -> None: + """Record permission denied.""" + context_kwargs = self._get_context_kwargs( + exclude={"user_id", "resource_id", "permission"} + ) + self._logger.warning( + "knowledge_graph_permission_denied", + user_id=user_id, + resource_id=resource_id, + permission=permission, + **context_kwargs, + ) From 19b6d6f4fe27a2b25ad25be4b499b322244168e6 Mon Sep 17 00:00:00 2001 From: John Sell Date: Tue, 17 Mar 2026 16:40:09 -0400 Subject: [PATCH 02/10] feat(management): add KnowledgeGraphService with SpiceDB authorization Add KnowledgeGraphService application service implementing CRUD operations with SpiceDB permission checks (EDIT, VIEW, MANAGE). Includes cascade delete of DataSources, read_relationships for workspace-scoped listing, and IntegrityError-to-domain exception mapping. Also adds UnauthorizedError to management ports. Co-Authored-By: Claude Opus 4.6 --- .../application/services/__init__.py | 15 + .../services/knowledge_graph_service.py | 381 ++++++++++++ src/api/management/ports/exceptions.py | 11 + .../unit/management/application/__init__.py | 0 .../test_knowledge_graph_service.py | 581 ++++++++++++++++++ 5 files changed, 988 insertions(+) create mode 100644 src/api/management/application/services/__init__.py create mode 100644 src/api/management/application/services/knowledge_graph_service.py create mode 100644 src/api/tests/unit/management/application/__init__.py create mode 100644 src/api/tests/unit/management/application/test_knowledge_graph_service.py diff --git a/src/api/management/application/services/__init__.py b/src/api/management/application/services/__init__.py new file mode 100644 index 00000000..73d75fc6 --- /dev/null +++ b/src/api/management/application/services/__init__.py @@ -0,0 +1,15 @@ +"""Management application services. + +Application services orchestrate domain operations with proper +authorization, transaction management, and observability. +""" + +from management.application.services.data_source_service import DataSourceService +from management.application.services.knowledge_graph_service import ( + KnowledgeGraphService, +) + +__all__ = [ + "DataSourceService", + "KnowledgeGraphService", +] diff --git a/src/api/management/application/services/knowledge_graph_service.py b/src/api/management/application/services/knowledge_graph_service.py new file mode 100644 index 00000000..ede630ef --- /dev/null +++ b/src/api/management/application/services/knowledge_graph_service.py @@ -0,0 +1,381 @@ +"""KnowledgeGraph application service for Management bounded context. + +Orchestrates knowledge graph operations with proper authorization, +transaction management, and observability. +""" + +from __future__ import annotations + +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession + +from management.application.observability import ( + DefaultKnowledgeGraphServiceProbe, + KnowledgeGraphServiceProbe, +) +from management.domain.aggregates import KnowledgeGraph +from management.domain.value_objects import KnowledgeGraphId +from management.ports.exceptions import ( + DuplicateKnowledgeGraphNameError, + UnauthorizedError, +) +from management.ports.repositories import ( + IDataSourceRepository, + IKnowledgeGraphRepository, +) +from shared_kernel.authorization.protocols import AuthorizationProvider +from shared_kernel.authorization.types import ( + Permission, + RelationType, + ResourceType, + format_resource, + format_subject, +) + + +class KnowledgeGraphService: + """Application service for knowledge graph management. + + Orchestrates knowledge graph operations with proper tenant scoping, + authorization checks, and business rule enforcement. + """ + + def __init__( + self, + session: AsyncSession, + knowledge_graph_repository: IKnowledgeGraphRepository, + authz: AuthorizationProvider, + scope_to_tenant: str, + probe: KnowledgeGraphServiceProbe | None = None, + data_source_repository: IDataSourceRepository | None = None, + ) -> None: + """Initialize KnowledgeGraphService with dependencies. + + Args: + session: Database session for transaction management + knowledge_graph_repository: Repository for KG persistence + authz: Authorization provider for permission checks + scope_to_tenant: Tenant ID string to scope this service to + probe: Optional domain probe for observability + data_source_repository: Optional DS repository for cascade delete + """ + self._session = session + self._kg_repo = knowledge_graph_repository + self._authz = authz + self._scope_to_tenant = scope_to_tenant + self._probe = probe or DefaultKnowledgeGraphServiceProbe() + self._ds_repo = data_source_repository + + async def _check_permission( + self, + user_id: str, + resource_type: ResourceType, + resource_id: str, + permission: Permission, + ) -> bool: + """Check if user has permission on a resource. + + Args: + user_id: The user to check + resource_type: Type of resource + resource_id: ID of the resource + permission: The permission to check + + Returns: + True if user has permission, False otherwise + """ + resource = format_resource(resource_type, resource_id) + subject = format_subject(ResourceType.USER, user_id) + return await self._authz.check_permission( + resource=resource, + permission=permission, + subject=subject, + ) + + async def create( + self, + user_id: str, + workspace_id: str, + name: str, + description: str, + ) -> KnowledgeGraph: + """Create a new knowledge graph in a workspace. + + Args: + user_id: The user creating the KG + workspace_id: The workspace to create the KG in + name: Name of the knowledge graph + description: Description of the knowledge graph + + Returns: + The created KnowledgeGraph aggregate + + Raises: + UnauthorizedError: If user lacks EDIT permission on workspace + DuplicateKnowledgeGraphNameError: If name already exists in tenant + """ + has_edit = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.WORKSPACE, + resource_id=workspace_id, + permission=Permission.EDIT, + ) + + if not has_edit: + self._probe.permission_denied( + user_id=user_id, + resource_id=workspace_id, + permission=Permission.EDIT, + ) + raise UnauthorizedError( + f"User {user_id} lacks edit permission on workspace {workspace_id}" + ) + + try: + async with self._session.begin(): + kg = KnowledgeGraph.create( + tenant_id=self._scope_to_tenant, + workspace_id=workspace_id, + name=name, + description=description, + created_by=user_id, + ) + await self._kg_repo.save(kg) + + self._probe.knowledge_graph_created( + kg_id=kg.id.value, + tenant_id=self._scope_to_tenant, + workspace_id=workspace_id, + name=name, + ) + + return kg + + except IntegrityError as e: + self._probe.knowledge_graph_creation_failed( + tenant_id=self._scope_to_tenant, + name=name, + error=str(e), + ) + raise DuplicateKnowledgeGraphNameError( + f"Knowledge graph '{name}' already exists in tenant" + ) from e + + async def get( + self, + user_id: str, + kg_id: str, + ) -> KnowledgeGraph | None: + """Get a knowledge graph by ID with authorization check. + + Args: + user_id: The user requesting access + kg_id: The knowledge graph ID + + Returns: + The KnowledgeGraph aggregate, or None if not found + + Raises: + UnauthorizedError: If user lacks VIEW permission + """ + kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) + if kg is None: + return None + + has_view = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.KNOWLEDGE_GRAPH, + resource_id=kg_id, + permission=Permission.VIEW, + ) + + if not has_view: + self._probe.permission_denied( + user_id=user_id, + resource_id=kg_id, + permission=Permission.VIEW, + ) + raise UnauthorizedError( + f"User {user_id} lacks view permission on knowledge graph {kg_id}" + ) + + self._probe.knowledge_graph_retrieved(kg_id=kg_id) + return kg + + async def list_for_workspace( + self, + user_id: str, + workspace_id: str, + ) -> list[KnowledgeGraph]: + """List knowledge graphs in a workspace. + + Uses read_relationships to discover KG IDs linked to the workspace, + then fetches each from the repository and filters by tenant. + + Args: + user_id: The user requesting the list + workspace_id: The workspace to list KGs for + + Returns: + List of KnowledgeGraph aggregates + + Raises: + UnauthorizedError: If user lacks VIEW permission on workspace + """ + has_view = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.WORKSPACE, + resource_id=workspace_id, + permission=Permission.VIEW, + ) + + if not has_view: + self._probe.permission_denied( + user_id=user_id, + resource_id=workspace_id, + permission=Permission.VIEW, + ) + raise UnauthorizedError( + f"User {user_id} lacks view permission on workspace {workspace_id}" + ) + + # Read explicit tuples to discover KG IDs linked to this workspace + tuples = await self._authz.read_relationships( + resource_type=ResourceType.KNOWLEDGE_GRAPH, + relation=RelationType.WORKSPACE, + subject_type=ResourceType.WORKSPACE, + subject_id=workspace_id, + ) + + # Extract KG IDs from relationship tuples + # Format is "knowledge_graph:ID" + kg_ids: list[str] = [] + for rel_tuple in tuples: + parts = rel_tuple.resource.split(":") + if len(parts) == 2: + kg_ids.append(parts[1]) + + # Fetch each KG from repo and filter by tenant + kgs: list[KnowledgeGraph] = [] + for kg_id in kg_ids: + kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) + if kg is not None and kg.tenant_id == self._scope_to_tenant: + kgs.append(kg) + + self._probe.knowledge_graphs_listed( + workspace_id=workspace_id, + count=len(kgs), + ) + + return kgs + + async def update( + self, + user_id: str, + kg_id: str, + name: str, + description: str, + ) -> KnowledgeGraph: + """Update a knowledge graph's metadata. + + Args: + user_id: The user performing the update + kg_id: The knowledge graph ID + name: New name + description: New description + + Returns: + The updated KnowledgeGraph aggregate + + Raises: + UnauthorizedError: If user lacks EDIT permission + ValueError: If KG not found + DuplicateKnowledgeGraphNameError: If name already exists + """ + has_edit = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.KNOWLEDGE_GRAPH, + resource_id=kg_id, + permission=Permission.EDIT, + ) + + if not has_edit: + self._probe.permission_denied( + user_id=user_id, + resource_id=kg_id, + permission=Permission.EDIT, + ) + raise UnauthorizedError( + f"User {user_id} lacks edit permission on knowledge graph {kg_id}" + ) + + kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) + if kg is None: + raise ValueError(f"Knowledge graph {kg_id} not found") + + kg.update(name=name, description=description, updated_by=user_id) + + try: + async with self._session.begin(): + await self._kg_repo.save(kg) + except IntegrityError as e: + raise DuplicateKnowledgeGraphNameError( + f"Knowledge graph '{name}' already exists in tenant" + ) from e + + self._probe.knowledge_graph_updated(kg_id=kg_id, name=name) + + return kg + + async def delete( + self, + user_id: str, + kg_id: str, + ) -> bool: + """Delete a knowledge graph and cascade delete its data sources. + + Args: + user_id: The user performing the deletion + kg_id: The knowledge graph ID + + Returns: + True if deleted, False if not found + + Raises: + UnauthorizedError: If user lacks MANAGE permission + """ + has_manage = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.KNOWLEDGE_GRAPH, + resource_id=kg_id, + permission=Permission.MANAGE, + ) + + if not has_manage: + self._probe.permission_denied( + user_id=user_id, + resource_id=kg_id, + permission=Permission.MANAGE, + ) + raise UnauthorizedError( + f"User {user_id} lacks manage permission on knowledge graph {kg_id}" + ) + + kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) + if kg is None: + return False + + async with self._session.begin(): + # Cascade delete data sources if repo is available + if self._ds_repo is not None: + data_sources = await self._ds_repo.find_by_knowledge_graph(kg_id) + for ds in data_sources: + ds.mark_for_deletion(deleted_by=user_id) + await self._ds_repo.delete(ds) + + kg.mark_for_deletion(deleted_by=user_id) + await self._kg_repo.delete(kg) + + self._probe.knowledge_graph_deleted(kg_id=kg_id) + + return True diff --git a/src/api/management/ports/exceptions.py b/src/api/management/ports/exceptions.py index a92f67fe..6147c5ff 100644 --- a/src/api/management/ports/exceptions.py +++ b/src/api/management/ports/exceptions.py @@ -26,3 +26,14 @@ class DuplicateDataSourceNameError(Exception): """ pass + + +class UnauthorizedError(Exception): + """Raised when a user lacks permission to perform an operation. + + This exception indicates that authorization checks have failed. + The application layer should handle this and return appropriate + HTTP 403 responses without exposing internal details. + """ + + pass diff --git a/src/api/tests/unit/management/application/__init__.py b/src/api/tests/unit/management/application/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/api/tests/unit/management/application/test_knowledge_graph_service.py b/src/api/tests/unit/management/application/test_knowledge_graph_service.py new file mode 100644 index 00000000..36d9385b --- /dev/null +++ b/src/api/tests/unit/management/application/test_knowledge_graph_service.py @@ -0,0 +1,581 @@ +"""Unit tests for KnowledgeGraphService. + +Tests verify authorization checks, repository interactions, +transaction management, and observability probe calls. +""" + +from __future__ import annotations + +from contextlib import asynccontextmanager +from datetime import UTC, datetime +from unittest.mock import AsyncMock, MagicMock + +import pytest +from sqlalchemy.exc import IntegrityError + +from management.application.services.knowledge_graph_service import ( + KnowledgeGraphService, +) +from management.domain.aggregates import KnowledgeGraph +from management.domain.value_objects import KnowledgeGraphId +from management.ports.exceptions import ( + DuplicateKnowledgeGraphNameError, + UnauthorizedError, +) +from shared_kernel.authorization.types import ( + Permission, + RelationshipTuple, +) + + +@pytest.fixture +def mock_session(): + """Create a mock AsyncSession with begin() context manager.""" + session = MagicMock() + + @asynccontextmanager + async def _begin(): + yield + + session.begin = _begin + return session + + +@pytest.fixture +def mock_kg_repo(): + """Create a mock IKnowledgeGraphRepository.""" + return AsyncMock() + + +@pytest.fixture +def mock_ds_repo(): + """Create a mock IDataSourceRepository.""" + return AsyncMock() + + +@pytest.fixture +def mock_authz(): + """Create a mock AuthorizationProvider.""" + return AsyncMock() + + +@pytest.fixture +def mock_probe(): + """Create a mock KnowledgeGraphServiceProbe.""" + return MagicMock() + + +@pytest.fixture +def tenant_id(): + return "tenant-123" + + +@pytest.fixture +def user_id(): + return "user-456" + + +@pytest.fixture +def workspace_id(): + return "workspace-789" + + +@pytest.fixture +def service( + mock_session, mock_kg_repo, mock_ds_repo, mock_authz, mock_probe, tenant_id +): + """Create a KnowledgeGraphService with mocked dependencies.""" + return KnowledgeGraphService( + session=mock_session, + knowledge_graph_repository=mock_kg_repo, + data_source_repository=mock_ds_repo, + authz=mock_authz, + scope_to_tenant=tenant_id, + probe=mock_probe, + ) + + +def _make_kg( + kg_id: str = "kg-001", + tenant_id: str = "tenant-123", + workspace_id: str = "workspace-789", + name: str = "Test KG", + description: str = "A test knowledge graph", +) -> KnowledgeGraph: + """Create a KnowledgeGraph instance for testing.""" + now = datetime.now(UTC) + kg = KnowledgeGraph( + id=KnowledgeGraphId(value=kg_id), + tenant_id=tenant_id, + workspace_id=workspace_id, + name=name, + description=description, + created_at=now, + updated_at=now, + ) + # Clear events from construction + kg.collect_events() + return kg + + +# ---- create ---- + + +class TestKnowledgeGraphServiceCreate: + """Tests for KnowledgeGraphService.create.""" + + @pytest.mark.asyncio + async def test_create_checks_edit_permission_on_workspace( + self, service, mock_authz, user_id, workspace_id + ): + """create() must check EDIT permission on the workspace.""" + mock_authz.check_permission.return_value = True + + await service.create( + user_id=user_id, + workspace_id=workspace_id, + name="My KG", + description="desc", + ) + + mock_authz.check_permission.assert_called_once_with( + resource=f"workspace:{workspace_id}", + permission=Permission.EDIT, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_create_raises_unauthorized_when_permission_denied( + self, service, mock_authz, mock_probe, user_id, workspace_id + ): + """create() raises UnauthorizedError when user lacks EDIT on workspace.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.create( + user_id=user_id, + workspace_id=workspace_id, + name="My KG", + description="desc", + ) + + mock_probe.permission_denied.assert_called_once_with( + user_id=user_id, + resource_id=workspace_id, + permission=Permission.EDIT, + ) + + @pytest.mark.asyncio + async def test_create_saves_aggregate_via_repo( + self, service, mock_authz, mock_kg_repo, user_id, workspace_id, tenant_id + ): + """create() saves the KnowledgeGraph aggregate through the repository.""" + mock_authz.check_permission.return_value = True + + result = await service.create( + user_id=user_id, + workspace_id=workspace_id, + name="My KG", + description="desc", + ) + + assert result.name == "My KG" + assert result.description == "desc" + assert result.tenant_id == tenant_id + assert result.workspace_id == workspace_id + mock_kg_repo.save.assert_called_once() + + @pytest.mark.asyncio + async def test_create_probes_success( + self, service, mock_authz, mock_probe, user_id, workspace_id, tenant_id + ): + """create() calls probe on success.""" + mock_authz.check_permission.return_value = True + + result = await service.create( + user_id=user_id, + workspace_id=workspace_id, + name="My KG", + description="desc", + ) + + mock_probe.knowledge_graph_created.assert_called_once_with( + kg_id=result.id.value, + tenant_id=tenant_id, + workspace_id=workspace_id, + name="My KG", + ) + + @pytest.mark.asyncio + async def test_create_raises_duplicate_on_integrity_error( + self, service, mock_authz, mock_kg_repo, user_id, workspace_id + ): + """create() catches IntegrityError and raises DuplicateKnowledgeGraphNameError.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.save.side_effect = IntegrityError( + "INSERT", {}, Exception("uq_knowledge_graphs_tenant_name") + ) + + with pytest.raises(DuplicateKnowledgeGraphNameError): + await service.create( + user_id=user_id, + workspace_id=workspace_id, + name="Duplicate", + description="desc", + ) + + +# ---- get ---- + + +class TestKnowledgeGraphServiceGet: + """Tests for KnowledgeGraphService.get.""" + + @pytest.mark.asyncio + async def test_get_returns_none_when_not_found( + self, service, mock_kg_repo, user_id + ): + """get() returns None when KG is not found.""" + mock_kg_repo.get_by_id.return_value = None + + result = await service.get(user_id=user_id, kg_id="nonexistent") + + assert result is None + + @pytest.mark.asyncio + async def test_get_checks_view_permission( + self, service, mock_authz, mock_kg_repo, user_id + ): + """get() checks VIEW permission on the knowledge graph.""" + kg = _make_kg() + mock_kg_repo.get_by_id.return_value = kg + mock_authz.check_permission.return_value = True + + await service.get(user_id=user_id, kg_id=kg.id.value) + + mock_authz.check_permission.assert_called_once_with( + resource=f"knowledge_graph:{kg.id.value}", + permission=Permission.VIEW, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_get_raises_unauthorized_when_permission_denied( + self, service, mock_authz, mock_kg_repo, user_id + ): + """get() raises UnauthorizedError when user lacks VIEW.""" + kg = _make_kg() + mock_kg_repo.get_by_id.return_value = kg + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.get(user_id=user_id, kg_id=kg.id.value) + + @pytest.mark.asyncio + async def test_get_returns_aggregate_on_success( + self, service, mock_authz, mock_kg_repo, mock_probe, user_id + ): + """get() returns the aggregate when authorized.""" + kg = _make_kg() + mock_kg_repo.get_by_id.return_value = kg + mock_authz.check_permission.return_value = True + + result = await service.get(user_id=user_id, kg_id=kg.id.value) + + assert result is kg + mock_probe.knowledge_graph_retrieved.assert_called_once_with( + kg_id=kg.id.value, + ) + + +# ---- list_for_workspace ---- + + +class TestKnowledgeGraphServiceListForWorkspace: + """Tests for KnowledgeGraphService.list_for_workspace.""" + + @pytest.mark.asyncio + async def test_list_checks_view_permission_on_workspace( + self, service, mock_authz, user_id, workspace_id + ): + """list_for_workspace() checks VIEW on the workspace.""" + mock_authz.check_permission.return_value = True + mock_authz.read_relationships.return_value = [] + + await service.list_for_workspace(user_id=user_id, workspace_id=workspace_id) + + mock_authz.check_permission.assert_called_once_with( + resource=f"workspace:{workspace_id}", + permission=Permission.VIEW, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_list_raises_unauthorized_when_permission_denied( + self, service, mock_authz, user_id, workspace_id + ): + """list_for_workspace() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.list_for_workspace(user_id=user_id, workspace_id=workspace_id) + + @pytest.mark.asyncio + async def test_list_uses_read_relationships_to_discover_kgs( + self, + service, + mock_authz, + mock_kg_repo, + mock_probe, + user_id, + workspace_id, + tenant_id, + ): + """list_for_workspace() reads relationships to find KG IDs.""" + mock_authz.check_permission.return_value = True + kg1 = _make_kg(kg_id="kg-001", tenant_id=tenant_id) + kg2 = _make_kg(kg_id="kg-002", tenant_id=tenant_id) + mock_authz.read_relationships.return_value = [ + RelationshipTuple( + resource="knowledge_graph:kg-001", + relation="workspace", + subject=f"workspace:{workspace_id}", + ), + RelationshipTuple( + resource="knowledge_graph:kg-002", + relation="workspace", + subject=f"workspace:{workspace_id}", + ), + ] + mock_kg_repo.get_by_id.side_effect = [kg1, kg2] + + result = await service.list_for_workspace( + user_id=user_id, workspace_id=workspace_id + ) + + assert len(result) == 2 + mock_probe.knowledge_graphs_listed.assert_called_once_with( + workspace_id=workspace_id, + count=2, + ) + + @pytest.mark.asyncio + async def test_list_filters_by_tenant( + self, service, mock_authz, mock_kg_repo, user_id, workspace_id, tenant_id + ): + """list_for_workspace() filters KGs that don't belong to the scoped tenant.""" + mock_authz.check_permission.return_value = True + kg_own = _make_kg(kg_id="kg-001", tenant_id=tenant_id) + kg_other = _make_kg(kg_id="kg-002", tenant_id="other-tenant") + mock_authz.read_relationships.return_value = [ + RelationshipTuple( + resource="knowledge_graph:kg-001", + relation="workspace", + subject=f"workspace:{workspace_id}", + ), + RelationshipTuple( + resource="knowledge_graph:kg-002", + relation="workspace", + subject=f"workspace:{workspace_id}", + ), + ] + mock_kg_repo.get_by_id.side_effect = [kg_own, kg_other] + + result = await service.list_for_workspace( + user_id=user_id, workspace_id=workspace_id + ) + + assert len(result) == 1 + assert result[0].id.value == "kg-001" + + +# ---- update ---- + + +class TestKnowledgeGraphServiceUpdate: + """Tests for KnowledgeGraphService.update.""" + + @pytest.mark.asyncio + async def test_update_checks_edit_permission_on_kg( + self, service, mock_authz, mock_kg_repo, user_id + ): + """update() checks EDIT permission on the knowledge graph.""" + kg = _make_kg() + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + + await service.update( + user_id=user_id, + kg_id=kg.id.value, + name="Updated", + description="Updated desc", + ) + + mock_authz.check_permission.assert_called_once_with( + resource=f"knowledge_graph:{kg.id.value}", + permission=Permission.EDIT, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_update_raises_unauthorized_when_permission_denied( + self, service, mock_authz, user_id + ): + """update() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.update( + user_id=user_id, + kg_id="kg-001", + name="Updated", + description="Updated desc", + ) + + @pytest.mark.asyncio + async def test_update_raises_value_error_when_not_found( + self, service, mock_authz, mock_kg_repo, user_id + ): + """update() raises ValueError when KG not found.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = None + + with pytest.raises(ValueError): + await service.update( + user_id=user_id, + kg_id="nonexistent", + name="Updated", + description="Updated desc", + ) + + @pytest.mark.asyncio + async def test_update_calls_aggregate_update_and_saves( + self, service, mock_authz, mock_kg_repo, mock_probe, user_id + ): + """update() calls kg.update() and saves via repo.""" + kg = _make_kg() + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + + result = await service.update( + user_id=user_id, + kg_id=kg.id.value, + name="Updated", + description="New desc", + ) + + assert result.name == "Updated" + assert result.description == "New desc" + mock_kg_repo.save.assert_called_once_with(kg) + mock_probe.knowledge_graph_updated.assert_called_once_with( + kg_id=kg.id.value, + name="Updated", + ) + + @pytest.mark.asyncio + async def test_update_raises_duplicate_on_integrity_error( + self, service, mock_authz, mock_kg_repo, user_id + ): + """update() catches IntegrityError and raises DuplicateKnowledgeGraphNameError.""" + kg = _make_kg() + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + mock_kg_repo.save.side_effect = IntegrityError( + "UPDATE", {}, Exception("uq_knowledge_graphs_tenant_name") + ) + + with pytest.raises(DuplicateKnowledgeGraphNameError): + await service.update( + user_id=user_id, + kg_id=kg.id.value, + name="Duplicate", + description="desc", + ) + + +# ---- delete ---- + + +class TestKnowledgeGraphServiceDelete: + """Tests for KnowledgeGraphService.delete.""" + + @pytest.mark.asyncio + async def test_delete_checks_manage_permission_on_kg( + self, service, mock_authz, mock_kg_repo, mock_ds_repo, user_id + ): + """delete() checks MANAGE permission on the knowledge graph.""" + kg = _make_kg() + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + mock_ds_repo.find_by_knowledge_graph.return_value = [] + mock_kg_repo.delete.return_value = True + + await service.delete(user_id=user_id, kg_id=kg.id.value) + + mock_authz.check_permission.assert_called_once_with( + resource=f"knowledge_graph:{kg.id.value}", + permission=Permission.MANAGE, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_delete_raises_unauthorized_when_permission_denied( + self, service, mock_authz, user_id + ): + """delete() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.delete(user_id=user_id, kg_id="kg-001") + + @pytest.mark.asyncio + async def test_delete_returns_false_when_not_found( + self, service, mock_authz, mock_kg_repo, mock_ds_repo, user_id + ): + """delete() returns False when KG not found.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = None + + result = await service.delete(user_id=user_id, kg_id="nonexistent") + + assert result is False + + @pytest.mark.asyncio + async def test_delete_cascades_data_sources( + self, service, mock_authz, mock_kg_repo, mock_ds_repo, user_id, tenant_id + ): + """delete() deletes all data sources before deleting the KG.""" + kg = _make_kg(tenant_id=tenant_id) + ds1 = MagicMock() + ds2 = MagicMock() + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + mock_ds_repo.find_by_knowledge_graph.return_value = [ds1, ds2] + mock_ds_repo.delete.return_value = True + mock_kg_repo.delete.return_value = True + + result = await service.delete(user_id=user_id, kg_id=kg.id.value) + + assert result is True + # Each DS should be marked for deletion and deleted + ds1.mark_for_deletion.assert_called_once() + ds2.mark_for_deletion.assert_called_once() + assert mock_ds_repo.delete.call_count == 2 + mock_kg_repo.delete.assert_called_once_with(kg) + + @pytest.mark.asyncio + async def test_delete_probes_success( + self, service, mock_authz, mock_kg_repo, mock_ds_repo, mock_probe, user_id + ): + """delete() calls probe on success.""" + kg = _make_kg() + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + mock_ds_repo.find_by_knowledge_graph.return_value = [] + mock_kg_repo.delete.return_value = True + + await service.delete(user_id=user_id, kg_id=kg.id.value) + + mock_probe.knowledge_graph_deleted.assert_called_once_with( + kg_id=kg.id.value, + ) From 0077cb31f00e51db834ad731119bddd939592399 Mon Sep 17 00:00:00 2001 From: John Sell Date: Tue, 17 Mar 2026 16:40:34 -0400 Subject: [PATCH 03/10] feat(management): add DataSourceService with SpiceDB authorization Add DataSourceService application service implementing CRUD + trigger_sync operations with SpiceDB permission checks. Handles credential storage via ISecretStoreRepository, KG existence/tenant validation, sync run creation with DataSourceSyncRequested event emission. Co-Authored-By: Claude Opus 4.6 --- .../services/data_source_service.py | 460 +++++++++++++ .../application/test_data_source_service.py | 637 ++++++++++++++++++ 2 files changed, 1097 insertions(+) create mode 100644 src/api/management/application/services/data_source_service.py create mode 100644 src/api/tests/unit/management/application/test_data_source_service.py diff --git a/src/api/management/application/services/data_source_service.py b/src/api/management/application/services/data_source_service.py new file mode 100644 index 00000000..41ead1bc --- /dev/null +++ b/src/api/management/application/services/data_source_service.py @@ -0,0 +1,460 @@ +"""DataSource application service for Management bounded context. + +Orchestrates data source operations with proper authorization, +credential management, transaction management, and observability. +""" + +from __future__ import annotations + +from datetime import UTC, datetime + +from sqlalchemy.ext.asyncio import AsyncSession + +from management.application.observability import ( + DataSourceServiceProbe, + DefaultDataSourceServiceProbe, +) +from management.domain.aggregates import DataSource +from management.domain.entities import DataSourceSyncRun +from management.domain.events import DataSourceSyncRequested +from management.domain.value_objects import DataSourceId, KnowledgeGraphId +from management.ports.exceptions import UnauthorizedError +from management.ports.repositories import ( + IDataSourceRepository, + IDataSourceSyncRunRepository, + IKnowledgeGraphRepository, +) +from management.ports.secret_store import ISecretStoreRepository +from shared_kernel.authorization.protocols import AuthorizationProvider +from shared_kernel.authorization.types import ( + Permission, + ResourceType, + format_resource, + format_subject, +) +from shared_kernel.datasource_types import DataSourceAdapterType + + +class DataSourceService: + """Application service for data source management. + + Orchestrates data source operations with proper tenant scoping, + authorization checks, credential management, and observability. + """ + + def __init__( + self, + session: AsyncSession, + data_source_repository: IDataSourceRepository, + knowledge_graph_repository: IKnowledgeGraphRepository, + secret_store: ISecretStoreRepository, + sync_run_repository: IDataSourceSyncRunRepository, + authz: AuthorizationProvider, + scope_to_tenant: str, + probe: DataSourceServiceProbe | None = None, + ) -> None: + """Initialize DataSourceService with dependencies. + + Args: + session: Database session for transaction management + data_source_repository: Repository for DS persistence + knowledge_graph_repository: Repository for KG lookups + secret_store: Secret store for credential management + sync_run_repository: Repository for sync run tracking + authz: Authorization provider for permission checks + scope_to_tenant: Tenant ID string to scope this service to + probe: Optional domain probe for observability + """ + self._session = session + self._ds_repo = data_source_repository + self._kg_repo = knowledge_graph_repository + self._secret_store = secret_store + self._sync_run_repo = sync_run_repository + self._authz = authz + self._scope_to_tenant = scope_to_tenant + self._probe = probe or DefaultDataSourceServiceProbe() + + async def _check_permission( + self, + user_id: str, + resource_type: ResourceType, + resource_id: str, + permission: Permission, + ) -> bool: + """Check if user has permission on a resource. + + Args: + user_id: The user to check + resource_type: Type of resource + resource_id: ID of the resource + permission: The permission to check + + Returns: + True if user has permission, False otherwise + """ + resource = format_resource(resource_type, resource_id) + subject = format_subject(ResourceType.USER, user_id) + return await self._authz.check_permission( + resource=resource, + permission=permission, + subject=subject, + ) + + async def create( + self, + user_id: str, + kg_id: str, + name: str, + adapter_type: DataSourceAdapterType, + connection_config: dict[str, str], + raw_credentials: dict[str, str] | None = None, + ) -> DataSource: + """Create a new data source in a knowledge graph. + + Args: + user_id: The user creating the DS + kg_id: The knowledge graph to create the DS in + name: Name of the data source + adapter_type: Type of adapter (e.g., GITHUB) + connection_config: Connection configuration key-value pairs + raw_credentials: Optional credentials to encrypt and store + + Returns: + The created DataSource aggregate + + Raises: + UnauthorizedError: If user lacks EDIT permission on KG + ValueError: If KG not found or belongs to different tenant + """ + has_edit = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.KNOWLEDGE_GRAPH, + resource_id=kg_id, + permission=Permission.EDIT, + ) + + if not has_edit: + self._probe.permission_denied( + user_id=user_id, + resource_id=kg_id, + permission=Permission.EDIT, + ) + raise UnauthorizedError( + f"User {user_id} lacks edit permission on knowledge graph {kg_id}" + ) + + # Verify KG exists and belongs to tenant + kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) + if kg is None: + raise ValueError(f"Knowledge graph {kg_id} not found") + if kg.tenant_id != self._scope_to_tenant: + raise ValueError(f"Knowledge graph {kg_id} belongs to different tenant") + + async with self._session.begin(): + ds = DataSource.create( + knowledge_graph_id=kg_id, + tenant_id=self._scope_to_tenant, + name=name, + adapter_type=adapter_type, + connection_config=connection_config, + created_by=user_id, + ) + + if raw_credentials is not None: + cred_path = f"datasource/{ds.id.value}/credentials" + await self._secret_store.store( + path=cred_path, + tenant_id=self._scope_to_tenant, + credentials=raw_credentials, + ) + ds.credentials_path = cred_path + + await self._ds_repo.save(ds) + + self._probe.data_source_created( + ds_id=ds.id.value, + kg_id=kg_id, + tenant_id=self._scope_to_tenant, + name=name, + ) + + return ds + + async def get( + self, + user_id: str, + ds_id: str, + ) -> DataSource | None: + """Get a data source by ID with authorization check. + + Args: + user_id: The user requesting access + ds_id: The data source ID + + Returns: + The DataSource aggregate, or None if not found + + Raises: + UnauthorizedError: If user lacks VIEW permission + """ + ds = await self._ds_repo.get_by_id(DataSourceId(value=ds_id)) + if ds is None: + return None + + has_view = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.DATA_SOURCE, + resource_id=ds_id, + permission=Permission.VIEW, + ) + + if not has_view: + self._probe.permission_denied( + user_id=user_id, + resource_id=ds_id, + permission=Permission.VIEW, + ) + raise UnauthorizedError( + f"User {user_id} lacks view permission on data source {ds_id}" + ) + + self._probe.data_source_retrieved(ds_id=ds_id) + return ds + + async def list_for_knowledge_graph( + self, + user_id: str, + kg_id: str, + ) -> list[DataSource]: + """List data sources for a knowledge graph. + + Args: + user_id: The user requesting the list + kg_id: The knowledge graph to list DSes for + + Returns: + List of DataSource aggregates + + Raises: + UnauthorizedError: If user lacks VIEW permission on KG + """ + has_view = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.KNOWLEDGE_GRAPH, + resource_id=kg_id, + permission=Permission.VIEW, + ) + + if not has_view: + self._probe.permission_denied( + user_id=user_id, + resource_id=kg_id, + permission=Permission.VIEW, + ) + raise UnauthorizedError( + f"User {user_id} lacks view permission on knowledge graph {kg_id}" + ) + + data_sources = await self._ds_repo.find_by_knowledge_graph(kg_id) + + self._probe.data_sources_listed( + kg_id=kg_id, + count=len(data_sources), + ) + + return data_sources + + async def update( + self, + user_id: str, + ds_id: str, + name: str | None = None, + connection_config: dict[str, str] | None = None, + raw_credentials: dict[str, str] | None = None, + ) -> DataSource: + """Update a data source's configuration. + + Args: + user_id: The user performing the update + ds_id: The data source ID + name: Optional new name + connection_config: Optional new connection configuration + raw_credentials: Optional new credentials to encrypt and store + + Returns: + The updated DataSource aggregate + + Raises: + UnauthorizedError: If user lacks EDIT permission + ValueError: If DS not found + """ + has_edit = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.DATA_SOURCE, + resource_id=ds_id, + permission=Permission.EDIT, + ) + + if not has_edit: + self._probe.permission_denied( + user_id=user_id, + resource_id=ds_id, + permission=Permission.EDIT, + ) + raise UnauthorizedError( + f"User {user_id} lacks edit permission on data source {ds_id}" + ) + + ds = await self._ds_repo.get_by_id(DataSourceId(value=ds_id)) + if ds is None: + raise ValueError(f"Data source {ds_id} not found") + + async with self._session.begin(): + if name is not None or connection_config is not None: + ds.update_connection( + name=name or ds.name, + connection_config=connection_config or ds.connection_config, + credentials_path=ds.credentials_path, + updated_by=user_id, + ) + + if raw_credentials is not None: + cred_path = f"datasource/{ds.id.value}/credentials" + await self._secret_store.store( + path=cred_path, + tenant_id=self._scope_to_tenant, + credentials=raw_credentials, + ) + ds.credentials_path = cred_path + + await self._ds_repo.save(ds) + + if name is not None: + self._probe.data_source_updated(ds_id=ds_id, name=name) + else: + self._probe.data_source_updated(ds_id=ds_id, name=ds.name) + + return ds + + async def delete( + self, + user_id: str, + ds_id: str, + ) -> bool: + """Delete a data source. + + Args: + user_id: The user performing the deletion + ds_id: The data source ID + + Returns: + True if deleted, False if not found + + Raises: + UnauthorizedError: If user lacks MANAGE permission + """ + has_manage = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.DATA_SOURCE, + resource_id=ds_id, + permission=Permission.MANAGE, + ) + + if not has_manage: + self._probe.permission_denied( + user_id=user_id, + resource_id=ds_id, + permission=Permission.MANAGE, + ) + raise UnauthorizedError( + f"User {user_id} lacks manage permission on data source {ds_id}" + ) + + ds = await self._ds_repo.get_by_id(DataSourceId(value=ds_id)) + if ds is None: + return False + + async with self._session.begin(): + if ds.credentials_path: + await self._secret_store.delete( + path=ds.credentials_path, + tenant_id=self._scope_to_tenant, + ) + + ds.mark_for_deletion(deleted_by=user_id) + await self._ds_repo.delete(ds) + + self._probe.data_source_deleted(ds_id=ds_id) + + return True + + async def trigger_sync( + self, + user_id: str, + ds_id: str, + ) -> DataSourceSyncRun: + """Trigger a sync for a data source. + + Args: + user_id: The user triggering the sync + ds_id: The data source ID + + Returns: + The created DataSourceSyncRun entity + + Raises: + UnauthorizedError: If user lacks MANAGE permission + ValueError: If DS not found + """ + has_manage = await self._check_permission( + user_id=user_id, + resource_type=ResourceType.DATA_SOURCE, + resource_id=ds_id, + permission=Permission.MANAGE, + ) + + if not has_manage: + self._probe.permission_denied( + user_id=user_id, + resource_id=ds_id, + permission=Permission.MANAGE, + ) + raise UnauthorizedError( + f"User {user_id} lacks manage permission on data source {ds_id}" + ) + + ds = await self._ds_repo.get_by_id(DataSourceId(value=ds_id)) + if ds is None: + raise ValueError(f"Data source {ds_id} not found") + + now = datetime.now(UTC) + + async with self._session.begin(): + from ulid import ULID + + sync_run = DataSourceSyncRun( + id=str(ULID()), + data_source_id=ds.id.value, + status="pending", + started_at=now, + completed_at=None, + error=None, + created_at=now, + ) + await self._sync_run_repo.save(sync_run) + + # Record sync requested event on the data source aggregate + ds._pending_events.append( + DataSourceSyncRequested( + data_source_id=ds.id.value, + knowledge_graph_id=ds.knowledge_graph_id, + tenant_id=ds.tenant_id, + occurred_at=now, + requested_by=user_id, + ) + ) + await self._ds_repo.save(ds) + + self._probe.sync_requested(ds_id=ds_id) + + return sync_run diff --git a/src/api/tests/unit/management/application/test_data_source_service.py b/src/api/tests/unit/management/application/test_data_source_service.py new file mode 100644 index 00000000..0d0017cb --- /dev/null +++ b/src/api/tests/unit/management/application/test_data_source_service.py @@ -0,0 +1,637 @@ +"""Unit tests for DataSourceService. + +Tests verify authorization checks, repository interactions, +credential storage, transaction management, and observability probe calls. +""" + +from __future__ import annotations + +from contextlib import asynccontextmanager +from datetime import UTC, datetime +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from management.application.services.data_source_service import DataSourceService +from management.domain.aggregates import DataSource, KnowledgeGraph +from management.domain.value_objects import ( + DataSourceId, + KnowledgeGraphId, + Schedule, + ScheduleType, +) +from management.ports.exceptions import UnauthorizedError +from shared_kernel.authorization.types import Permission +from shared_kernel.datasource_types import DataSourceAdapterType + + +@pytest.fixture +def mock_session(): + """Create a mock AsyncSession with begin() context manager.""" + session = MagicMock() + + @asynccontextmanager + async def _begin(): + yield + + session.begin = _begin + return session + + +@pytest.fixture +def mock_ds_repo(): + return AsyncMock() + + +@pytest.fixture +def mock_kg_repo(): + return AsyncMock() + + +@pytest.fixture +def mock_secret_store(): + return AsyncMock() + + +@pytest.fixture +def mock_sync_run_repo(): + return AsyncMock() + + +@pytest.fixture +def mock_authz(): + return AsyncMock() + + +@pytest.fixture +def mock_probe(): + return MagicMock() + + +@pytest.fixture +def tenant_id(): + return "tenant-123" + + +@pytest.fixture +def user_id(): + return "user-456" + + +@pytest.fixture +def kg_id(): + return "kg-789" + + +@pytest.fixture +def service( + mock_session, + mock_ds_repo, + mock_kg_repo, + mock_secret_store, + mock_sync_run_repo, + mock_authz, + mock_probe, + tenant_id, +): + return DataSourceService( + session=mock_session, + data_source_repository=mock_ds_repo, + knowledge_graph_repository=mock_kg_repo, + secret_store=mock_secret_store, + sync_run_repository=mock_sync_run_repo, + authz=mock_authz, + scope_to_tenant=tenant_id, + probe=mock_probe, + ) + + +def _make_kg( + kg_id: str = "kg-789", + tenant_id: str = "tenant-123", + workspace_id: str = "ws-001", +) -> KnowledgeGraph: + now = datetime.now(UTC) + kg = KnowledgeGraph( + id=KnowledgeGraphId(value=kg_id), + tenant_id=tenant_id, + workspace_id=workspace_id, + name="Test KG", + description="A test KG", + created_at=now, + updated_at=now, + ) + kg.collect_events() + return kg + + +def _make_ds( + ds_id: str = "ds-001", + kg_id: str = "kg-789", + tenant_id: str = "tenant-123", + name: str = "Test DS", + credentials_path: str | None = None, +) -> DataSource: + now = datetime.now(UTC) + ds = DataSource( + id=DataSourceId(value=ds_id), + knowledge_graph_id=kg_id, + tenant_id=tenant_id, + name=name, + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={"url": "https://github.com"}, + credentials_path=credentials_path, + schedule=Schedule(schedule_type=ScheduleType.MANUAL), + last_sync_at=None, + created_at=now, + updated_at=now, + ) + ds.collect_events() + return ds + + +# ---- create ---- + + +class TestDataSourceServiceCreate: + """Tests for DataSourceService.create.""" + + @pytest.mark.asyncio + async def test_create_checks_edit_permission_on_kg( + self, service, mock_authz, user_id, kg_id, mock_kg_repo, tenant_id + ): + """create() must check EDIT permission on the knowledge graph.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) + + await service.create( + user_id=user_id, + kg_id=kg_id, + name="My DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={"url": "https://github.com"}, + ) + + mock_authz.check_permission.assert_called_once_with( + resource=f"knowledge_graph:{kg_id}", + permission=Permission.EDIT, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_create_raises_unauthorized_when_permission_denied( + self, service, mock_authz, mock_probe, user_id, kg_id + ): + """create() raises UnauthorizedError when user lacks EDIT on KG.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.create( + user_id=user_id, + kg_id=kg_id, + name="My DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={"url": "https://github.com"}, + ) + + mock_probe.permission_denied.assert_called_once() + + @pytest.mark.asyncio + async def test_create_verifies_kg_exists_and_belongs_to_tenant( + self, service, mock_authz, mock_kg_repo, user_id, kg_id + ): + """create() raises ValueError when KG not found.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = None + + with pytest.raises(ValueError, match="not found"): + await service.create( + user_id=user_id, + kg_id=kg_id, + name="My DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={}, + ) + + @pytest.mark.asyncio + async def test_create_rejects_kg_from_different_tenant( + self, service, mock_authz, mock_kg_repo, user_id, kg_id + ): + """create() raises ValueError when KG belongs to different tenant.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg( + kg_id=kg_id, tenant_id="other-tenant" + ) + + with pytest.raises(ValueError, match="different tenant"): + await service.create( + user_id=user_id, + kg_id=kg_id, + name="My DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={}, + ) + + @pytest.mark.asyncio + async def test_create_stores_credentials_when_provided( + self, + service, + mock_authz, + mock_kg_repo, + mock_secret_store, + mock_ds_repo, + user_id, + kg_id, + tenant_id, + ): + """create() stores credentials via secret store when raw_credentials provided.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) + creds = {"token": "abc123"} + + await service.create( + user_id=user_id, + kg_id=kg_id, + name="My DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={"url": "https://github.com"}, + raw_credentials=creds, + ) + + mock_secret_store.store.assert_called_once() + call_kwargs = mock_secret_store.store.call_args + assert "datasource/" in call_kwargs.kwargs.get("path", "") or "datasource/" in ( + call_kwargs.args[0] if call_kwargs.args else "" + ) + + @pytest.mark.asyncio + async def test_create_probes_success( + self, service, mock_authz, mock_kg_repo, mock_probe, user_id, kg_id, tenant_id + ): + """create() calls probe on success.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) + + result = await service.create( + user_id=user_id, + kg_id=kg_id, + name="My DS", + adapter_type=DataSourceAdapterType.GITHUB, + connection_config={}, + ) + + mock_probe.data_source_created.assert_called_once_with( + ds_id=result.id.value, + kg_id=kg_id, + tenant_id=tenant_id, + name="My DS", + ) + + +# ---- get ---- + + +class TestDataSourceServiceGet: + """Tests for DataSourceService.get.""" + + @pytest.mark.asyncio + async def test_get_returns_none_when_not_found( + self, service, mock_ds_repo, user_id + ): + """get() returns None when DS not found.""" + mock_ds_repo.get_by_id.return_value = None + + result = await service.get(user_id=user_id, ds_id="nonexistent") + + assert result is None + + @pytest.mark.asyncio + async def test_get_checks_view_permission( + self, service, mock_authz, mock_ds_repo, user_id + ): + """get() checks VIEW permission on the data source.""" + ds = _make_ds() + mock_ds_repo.get_by_id.return_value = ds + mock_authz.check_permission.return_value = True + + await service.get(user_id=user_id, ds_id=ds.id.value) + + mock_authz.check_permission.assert_called_once_with( + resource=f"data_source:{ds.id.value}", + permission=Permission.VIEW, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_get_raises_unauthorized_when_denied( + self, service, mock_authz, mock_ds_repo, user_id + ): + """get() raises UnauthorizedError when user lacks VIEW.""" + ds = _make_ds() + mock_ds_repo.get_by_id.return_value = ds + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.get(user_id=user_id, ds_id=ds.id.value) + + @pytest.mark.asyncio + async def test_get_returns_aggregate_on_success( + self, service, mock_authz, mock_ds_repo, mock_probe, user_id + ): + """get() returns the aggregate when authorized.""" + ds = _make_ds() + mock_ds_repo.get_by_id.return_value = ds + mock_authz.check_permission.return_value = True + + result = await service.get(user_id=user_id, ds_id=ds.id.value) + + assert result is ds + mock_probe.data_source_retrieved.assert_called_once_with(ds_id=ds.id.value) + + +# ---- list_for_knowledge_graph ---- + + +class TestDataSourceServiceListForKnowledgeGraph: + """Tests for DataSourceService.list_for_knowledge_graph.""" + + @pytest.mark.asyncio + async def test_list_checks_view_permission_on_kg( + self, service, mock_authz, mock_ds_repo, user_id, kg_id + ): + """list_for_knowledge_graph() checks VIEW on the KG.""" + mock_authz.check_permission.return_value = True + mock_ds_repo.find_by_knowledge_graph.return_value = [] + + await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + + mock_authz.check_permission.assert_called_once_with( + resource=f"knowledge_graph:{kg_id}", + permission=Permission.VIEW, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_list_raises_unauthorized_when_denied( + self, service, mock_authz, user_id, kg_id + ): + """list_for_knowledge_graph() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + + @pytest.mark.asyncio + async def test_list_returns_data_sources( + self, service, mock_authz, mock_ds_repo, mock_probe, user_id, kg_id + ): + """list_for_knowledge_graph() returns data sources from repo.""" + mock_authz.check_permission.return_value = True + ds1 = _make_ds(ds_id="ds-001") + ds2 = _make_ds(ds_id="ds-002") + mock_ds_repo.find_by_knowledge_graph.return_value = [ds1, ds2] + + result = await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + + assert len(result) == 2 + mock_probe.data_sources_listed.assert_called_once_with( + kg_id=kg_id, + count=2, + ) + + +# ---- update ---- + + +class TestDataSourceServiceUpdate: + """Tests for DataSourceService.update.""" + + @pytest.mark.asyncio + async def test_update_checks_edit_permission_on_ds( + self, service, mock_authz, mock_ds_repo, user_id + ): + """update() checks EDIT permission on the data source.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + await service.update( + user_id=user_id, + ds_id=ds.id.value, + name="Updated", + connection_config={"url": "https://new.com"}, + ) + + mock_authz.check_permission.assert_called_once_with( + resource=f"data_source:{ds.id.value}", + permission=Permission.EDIT, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_update_raises_unauthorized_when_denied( + self, service, mock_authz, user_id + ): + """update() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.update( + user_id=user_id, + ds_id="ds-001", + name="Updated", + ) + + @pytest.mark.asyncio + async def test_update_raises_value_error_when_not_found( + self, service, mock_authz, mock_ds_repo, user_id + ): + """update() raises ValueError when DS not found.""" + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = None + + with pytest.raises(ValueError): + await service.update( + user_id=user_id, + ds_id="nonexistent", + name="Updated", + ) + + @pytest.mark.asyncio + async def test_update_stores_credentials_when_provided( + self, service, mock_authz, mock_ds_repo, mock_secret_store, user_id, tenant_id + ): + """update() stores credentials via secret store when raw_credentials provided.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + creds = {"token": "new-token"} + + await service.update( + user_id=user_id, + ds_id=ds.id.value, + raw_credentials=creds, + ) + + mock_secret_store.store.assert_called_once() + + @pytest.mark.asyncio + async def test_update_probes_success( + self, service, mock_authz, mock_ds_repo, mock_probe, user_id + ): + """update() probes success when name is updated.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + await service.update( + user_id=user_id, + ds_id=ds.id.value, + name="Updated", + connection_config={"url": "https://new.com"}, + ) + + mock_probe.data_source_updated.assert_called_once_with( + ds_id=ds.id.value, + name="Updated", + ) + + +# ---- delete ---- + + +class TestDataSourceServiceDelete: + """Tests for DataSourceService.delete.""" + + @pytest.mark.asyncio + async def test_delete_checks_manage_permission_on_ds( + self, service, mock_authz, mock_ds_repo, user_id + ): + """delete() checks MANAGE permission on the data source.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + mock_ds_repo.delete.return_value = True + + await service.delete(user_id=user_id, ds_id=ds.id.value) + + mock_authz.check_permission.assert_called_once_with( + resource=f"data_source:{ds.id.value}", + permission=Permission.MANAGE, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_delete_raises_unauthorized_when_denied( + self, service, mock_authz, user_id + ): + """delete() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.delete(user_id=user_id, ds_id="ds-001") + + @pytest.mark.asyncio + async def test_delete_returns_false_when_not_found( + self, service, mock_authz, mock_ds_repo, user_id + ): + """delete() returns False when DS not found.""" + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = None + + result = await service.delete(user_id=user_id, ds_id="nonexistent") + + assert result is False + + @pytest.mark.asyncio + async def test_delete_removes_credentials_if_path_exists( + self, service, mock_authz, mock_ds_repo, mock_secret_store, user_id, tenant_id + ): + """delete() deletes credentials from secret store if credentials_path is set.""" + ds = _make_ds(credentials_path="datasource/ds-001/credentials") + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + mock_ds_repo.delete.return_value = True + + await service.delete(user_id=user_id, ds_id=ds.id.value) + + mock_secret_store.delete.assert_called_once_with( + path="datasource/ds-001/credentials", + tenant_id=tenant_id, + ) + + @pytest.mark.asyncio + async def test_delete_probes_success( + self, service, mock_authz, mock_ds_repo, mock_probe, user_id + ): + """delete() calls probe on success.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + mock_ds_repo.delete.return_value = True + + await service.delete(user_id=user_id, ds_id=ds.id.value) + + mock_probe.data_source_deleted.assert_called_once_with(ds_id=ds.id.value) + + +# ---- trigger_sync ---- + + +class TestDataSourceServiceTriggerSync: + """Tests for DataSourceService.trigger_sync.""" + + @pytest.mark.asyncio + async def test_trigger_sync_checks_manage_permission( + self, service, mock_authz, mock_ds_repo, mock_sync_run_repo, user_id + ): + """trigger_sync() checks MANAGE permission on the data source.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + await service.trigger_sync(user_id=user_id, ds_id=ds.id.value) + + mock_authz.check_permission.assert_called_once_with( + resource=f"data_source:{ds.id.value}", + permission=Permission.MANAGE, + subject=f"user:{user_id}", + ) + + @pytest.mark.asyncio + async def test_trigger_sync_raises_unauthorized_when_denied( + self, service, mock_authz, user_id + ): + """trigger_sync() raises UnauthorizedError when denied.""" + mock_authz.check_permission.return_value = False + + with pytest.raises(UnauthorizedError): + await service.trigger_sync(user_id=user_id, ds_id="ds-001") + + @pytest.mark.asyncio + async def test_trigger_sync_raises_value_error_when_not_found( + self, service, mock_authz, mock_ds_repo, user_id + ): + """trigger_sync() raises ValueError when DS not found.""" + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = None + + with pytest.raises(ValueError): + await service.trigger_sync(user_id=user_id, ds_id="nonexistent") + + @pytest.mark.asyncio + async def test_trigger_sync_creates_sync_run_and_saves_ds( + self, service, mock_authz, mock_ds_repo, mock_sync_run_repo, mock_probe, user_id + ): + """trigger_sync() creates a sync run and saves the data source.""" + ds = _make_ds() + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + result = await service.trigger_sync(user_id=user_id, ds_id=ds.id.value) + + assert result.data_source_id == ds.id.value + assert result.status == "pending" + mock_sync_run_repo.save.assert_called_once() + mock_ds_repo.save.assert_called_once() + mock_probe.sync_requested.assert_called_once_with(ds_id=ds.id.value) From 39e707e98c09e82ac15d7a4381d5f09ad4540b30 Mon Sep 17 00:00:00 2001 From: John Sell Date: Tue, 17 Mar 2026 16:40:43 -0400 Subject: [PATCH 04/10] feat(management): add DI factory functions for application services Add FastAPI dependency injection factories for KnowledgeGraphService and DataSourceService, wiring repositories, secret store, authz, and tenant scoping from CurrentUser. Update architecture test to exclude management.dependencies from cross-context isolation checks since DI wiring is a presentation-layer concern. Co-Authored-By: Claude Opus 4.6 --- src/api/management/dependencies/__init__.py | 1 + .../management/dependencies/data_source.py | 63 +++++++++++++++++++ .../dependencies/knowledge_graph.py | 53 ++++++++++++++++ .../unit/management/test_architecture.py | 5 ++ 4 files changed, 122 insertions(+) create mode 100644 src/api/management/dependencies/__init__.py create mode 100644 src/api/management/dependencies/data_source.py create mode 100644 src/api/management/dependencies/knowledge_graph.py diff --git a/src/api/management/dependencies/__init__.py b/src/api/management/dependencies/__init__.py new file mode 100644 index 00000000..a4f1148c --- /dev/null +++ b/src/api/management/dependencies/__init__.py @@ -0,0 +1 @@ +"""FastAPI dependency injection for Management bounded context.""" diff --git a/src/api/management/dependencies/data_source.py b/src/api/management/dependencies/data_source.py new file mode 100644 index 00000000..d79271f3 --- /dev/null +++ b/src/api/management/dependencies/data_source.py @@ -0,0 +1,63 @@ +"""FastAPI dependency injection for DataSourceService. + +Provides DataSourceService instances for route handlers +using FastAPI's dependency injection system. +""" + +from typing import Annotated + +from fastapi import Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from iam.application.value_objects import CurrentUser +from iam.dependencies.user import get_current_user +from infrastructure.authorization_dependencies import get_spicedb_client +from infrastructure.database.dependencies import get_write_session +from infrastructure.outbox.repository import OutboxRepository +from infrastructure.settings import get_management_settings +from management.application.observability import DefaultDataSourceServiceProbe +from management.application.services.data_source_service import DataSourceService +from management.infrastructure.repositories import ( + DataSourceRepository, + DataSourceSyncRunRepository, + FernetSecretStore, + KnowledgeGraphRepository, +) +from shared_kernel.authorization.protocols import AuthorizationProvider + + +def get_data_source_service( + session: Annotated[AsyncSession, Depends(get_write_session)], + authz: Annotated[AuthorizationProvider, Depends(get_spicedb_client)], + current_user: Annotated[CurrentUser, Depends(get_current_user)], +) -> DataSourceService: + """Get DataSourceService instance. + + Args: + session: Async database session for transaction management + authz: Authorization provider (SpiceDB client) + current_user: The current user, from which tenant ID is extracted + + Returns: + DataSourceService instance scoped to the current tenant + """ + settings = get_management_settings() + outbox = OutboxRepository(session=session) + ds_repo = DataSourceRepository(session=session, outbox=outbox) + kg_repo = KnowledgeGraphRepository(session=session, outbox=outbox) + encryption_keys = settings.encryption_key.get_secret_value().split(",") + secret_store = FernetSecretStore( + session=session, + encryption_keys=encryption_keys, + ) + sync_run_repo = DataSourceSyncRunRepository(session=session) + return DataSourceService( + session=session, + data_source_repository=ds_repo, + knowledge_graph_repository=kg_repo, + secret_store=secret_store, + sync_run_repository=sync_run_repo, + authz=authz, + scope_to_tenant=current_user.tenant_id.value, + probe=DefaultDataSourceServiceProbe(), + ) diff --git a/src/api/management/dependencies/knowledge_graph.py b/src/api/management/dependencies/knowledge_graph.py new file mode 100644 index 00000000..48c495d9 --- /dev/null +++ b/src/api/management/dependencies/knowledge_graph.py @@ -0,0 +1,53 @@ +"""FastAPI dependency injection for KnowledgeGraphService. + +Provides KnowledgeGraphService instances for route handlers +using FastAPI's dependency injection system. +""" + +from typing import Annotated + +from fastapi import Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from iam.application.value_objects import CurrentUser +from iam.dependencies.user import get_current_user +from infrastructure.authorization_dependencies import get_spicedb_client +from infrastructure.database.dependencies import get_write_session +from infrastructure.outbox.repository import OutboxRepository +from management.application.observability import DefaultKnowledgeGraphServiceProbe +from management.application.services.knowledge_graph_service import ( + KnowledgeGraphService, +) +from management.infrastructure.repositories import ( + DataSourceRepository, + KnowledgeGraphRepository, +) +from shared_kernel.authorization.protocols import AuthorizationProvider + + +def get_knowledge_graph_service( + session: Annotated[AsyncSession, Depends(get_write_session)], + authz: Annotated[AuthorizationProvider, Depends(get_spicedb_client)], + current_user: Annotated[CurrentUser, Depends(get_current_user)], +) -> KnowledgeGraphService: + """Get KnowledgeGraphService instance. + + Args: + session: Async database session for transaction management + authz: Authorization provider (SpiceDB client) + current_user: The current user, from which tenant ID is extracted + + Returns: + KnowledgeGraphService instance scoped to the current tenant + """ + outbox = OutboxRepository(session=session) + kg_repo = KnowledgeGraphRepository(session=session, outbox=outbox) + ds_repo = DataSourceRepository(session=session, outbox=outbox) + return KnowledgeGraphService( + session=session, + knowledge_graph_repository=kg_repo, + data_source_repository=ds_repo, + authz=authz, + scope_to_tenant=current_user.tenant_id.value, + probe=DefaultKnowledgeGraphServiceProbe(), + ) diff --git a/src/api/tests/unit/management/test_architecture.py b/src/api/tests/unit/management/test_architecture.py index bd1b2489..dd6addae 100644 --- a/src/api/tests/unit/management/test_architecture.py +++ b/src/api/tests/unit/management/test_architecture.py @@ -226,10 +226,15 @@ def test_management_does_not_import_iam(self): IAM manages authentication and authorization. Management should not couple to IAM's user, tenant, or API key domain objects. + + The management.dependencies package is excluded because DI wiring + is a presentation-layer concern that legitimately crosses context + boundaries (e.g., extracting CurrentUser from IAM for tenant scoping). """ ( archrule("management_no_iam") .match("management*") + .exclude("management.dependencies*") .should_not_import("iam*") .check("management") ) From 8dbf82c7c81ee66887cf66eef47f0dddbbb8a176 Mon Sep 17 00:00:00 2001 From: John Sell Date: Tue, 17 Mar 2026 16:47:21 -0400 Subject: [PATCH 05/10] fix(management): add tenant scoping, fix existence leakage, and encapsulate sync request Security fixes: - Add tenant_id verification in KG get/update/delete methods - Add tenant_id verification in DS get/update/delete/trigger_sync methods - Return None on permission denied in get() to prevent existence leakage DDD fix: - Add DataSource.request_sync() domain method instead of directly accessing _pending_events from the application service Tests: - Add tenant scoping unit tests for all affected methods - Add get() returns None on permission denied tests - Add DataSource.request_sync() unit tests Co-Authored-By: Claude Opus 4.6 --- .../services/data_source_service.py | 32 +++++----- .../services/knowledge_graph_service.py | 18 +++--- .../domain/aggregates/data_source.py | 24 +++++++ .../application/test_data_source_service.py | 62 +++++++++++++++++-- .../test_knowledge_graph_service.py | 51 +++++++++++++-- .../tests/unit/management/test_data_source.py | 48 ++++++++++++++ 6 files changed, 201 insertions(+), 34 deletions(-) diff --git a/src/api/management/application/services/data_source_service.py b/src/api/management/application/services/data_source_service.py index 41ead1bc..edeb7f4c 100644 --- a/src/api/management/application/services/data_source_service.py +++ b/src/api/management/application/services/data_source_service.py @@ -16,7 +16,6 @@ ) from management.domain.aggregates import DataSource from management.domain.entities import DataSourceSyncRun -from management.domain.events import DataSourceSyncRequested from management.domain.value_objects import DataSourceId, KnowledgeGraphId from management.ports.exceptions import UnauthorizedError from management.ports.repositories import ( @@ -201,6 +200,9 @@ async def get( if ds is None: return None + if ds.tenant_id != self._scope_to_tenant: + return None + has_view = await self._check_permission( user_id=user_id, resource_type=ResourceType.DATA_SOURCE, @@ -209,14 +211,7 @@ async def get( ) if not has_view: - self._probe.permission_denied( - user_id=user_id, - resource_id=ds_id, - permission=Permission.VIEW, - ) - raise UnauthorizedError( - f"User {user_id} lacks view permission on data source {ds_id}" - ) + return None self._probe.data_source_retrieved(ds_id=ds_id) return ds @@ -309,6 +304,9 @@ async def update( if ds is None: raise ValueError(f"Data source {ds_id} not found") + if ds.tenant_id != self._scope_to_tenant: + raise ValueError(f"Data source {ds_id} not found") + async with self._session.begin(): if name is not None or connection_config is not None: ds.update_connection( @@ -374,6 +372,9 @@ async def delete( if ds is None: return False + if ds.tenant_id != self._scope_to_tenant: + return False + async with self._session.begin(): if ds.credentials_path: await self._secret_store.delete( @@ -427,6 +428,9 @@ async def trigger_sync( if ds is None: raise ValueError(f"Data source {ds_id} not found") + if ds.tenant_id != self._scope_to_tenant: + raise ValueError(f"Data source {ds_id} not found") + now = datetime.now(UTC) async with self._session.begin(): @@ -444,15 +448,7 @@ async def trigger_sync( await self._sync_run_repo.save(sync_run) # Record sync requested event on the data source aggregate - ds._pending_events.append( - DataSourceSyncRequested( - data_source_id=ds.id.value, - knowledge_graph_id=ds.knowledge_graph_id, - tenant_id=ds.tenant_id, - occurred_at=now, - requested_by=user_id, - ) - ) + ds.request_sync(requested_by=user_id) await self._ds_repo.save(ds) self._probe.sync_requested(ds_id=ds_id) diff --git a/src/api/management/application/services/knowledge_graph_service.py b/src/api/management/application/services/knowledge_graph_service.py index ede630ef..f00d6fe6 100644 --- a/src/api/management/application/services/knowledge_graph_service.py +++ b/src/api/management/application/services/knowledge_graph_service.py @@ -182,6 +182,9 @@ async def get( if kg is None: return None + if kg.tenant_id != self._scope_to_tenant: + return None + has_view = await self._check_permission( user_id=user_id, resource_type=ResourceType.KNOWLEDGE_GRAPH, @@ -190,14 +193,7 @@ async def get( ) if not has_view: - self._probe.permission_denied( - user_id=user_id, - resource_id=kg_id, - permission=Permission.VIEW, - ) - raise UnauthorizedError( - f"User {user_id} lacks view permission on knowledge graph {kg_id}" - ) + return None self._probe.knowledge_graph_retrieved(kg_id=kg_id) return kg @@ -313,6 +309,9 @@ async def update( if kg is None: raise ValueError(f"Knowledge graph {kg_id} not found") + if kg.tenant_id != self._scope_to_tenant: + raise ValueError(f"Knowledge graph {kg_id} not found") + kg.update(name=name, description=description, updated_by=user_id) try: @@ -365,6 +364,9 @@ async def delete( if kg is None: return False + if kg.tenant_id != self._scope_to_tenant: + return False + async with self._session.begin(): # Cascade delete data sources if repo is available if self._ds_repo is not None: diff --git a/src/api/management/domain/aggregates/data_source.py b/src/api/management/domain/aggregates/data_source.py index bfde0377..eb4400e0 100644 --- a/src/api/management/domain/aggregates/data_source.py +++ b/src/api/management/domain/aggregates/data_source.py @@ -9,6 +9,7 @@ from management.domain.events import ( DataSourceCreated, DataSourceDeleted, + DataSourceSyncRequested, DataSourceUpdated, ) from management.domain.exceptions import ( @@ -211,6 +212,29 @@ def update_connection( name=name, ) + def request_sync(self, *, requested_by: str | None = None) -> None: + """Request a sync for this data source. + + Records a DataSourceSyncRequested event. + + Args: + requested_by: The user who requested the sync (optional) + + Raises: + AggregateDeletedError: If the data source has been marked for deletion + """ + if self._deleted: + raise AggregateDeletedError("Cannot request sync on a deleted data source") + self._pending_events.append( + DataSourceSyncRequested( + data_source_id=self.id.value, + knowledge_graph_id=self.knowledge_graph_id, + tenant_id=self.tenant_id, + occurred_at=datetime.now(UTC), + requested_by=requested_by, + ) + ) + def record_sync_completed(self) -> None: """Record that a sync has completed. diff --git a/src/api/tests/unit/management/application/test_data_source_service.py b/src/api/tests/unit/management/application/test_data_source_service.py index 0d0017cb..71360e11 100644 --- a/src/api/tests/unit/management/application/test_data_source_service.py +++ b/src/api/tests/unit/management/application/test_data_source_service.py @@ -323,16 +323,29 @@ async def test_get_checks_view_permission( ) @pytest.mark.asyncio - async def test_get_raises_unauthorized_when_denied( + async def test_get_returns_none_for_different_tenant( + self, service, mock_ds_repo, user_id + ): + """get() returns None when DS belongs to a different tenant.""" + ds = _make_ds(tenant_id="other-tenant") + mock_ds_repo.get_by_id.return_value = ds + + result = await service.get(user_id=user_id, ds_id=ds.id.value) + + assert result is None + + @pytest.mark.asyncio + async def test_get_returns_none_when_permission_denied( self, service, mock_authz, mock_ds_repo, user_id ): - """get() raises UnauthorizedError when user lacks VIEW.""" + """get() returns None when user lacks VIEW (no existence leakage).""" ds = _make_ds() mock_ds_repo.get_by_id.return_value = ds mock_authz.check_permission.return_value = False - with pytest.raises(UnauthorizedError): - await service.get(user_id=user_id, ds_id=ds.id.value) + result = await service.get(user_id=user_id, ds_id=ds.id.value) + + assert result is None @pytest.mark.asyncio async def test_get_returns_aggregate_on_success( @@ -457,6 +470,22 @@ async def test_update_raises_value_error_when_not_found( name="Updated", ) + @pytest.mark.asyncio + async def test_update_rejects_different_tenant( + self, service, mock_authz, mock_ds_repo, user_id + ): + """update() raises ValueError when DS belongs to a different tenant.""" + ds = _make_ds(tenant_id="other-tenant") + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + with pytest.raises(ValueError): + await service.update( + user_id=user_id, + ds_id=ds.id.value, + name="Updated", + ) + @pytest.mark.asyncio async def test_update_stores_credentials_when_provided( self, service, mock_authz, mock_ds_repo, mock_secret_store, user_id, tenant_id @@ -543,6 +572,19 @@ async def test_delete_returns_false_when_not_found( assert result is False + @pytest.mark.asyncio + async def test_delete_returns_false_for_different_tenant( + self, service, mock_authz, mock_ds_repo, user_id + ): + """delete() returns False when DS belongs to a different tenant.""" + ds = _make_ds(tenant_id="other-tenant") + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + result = await service.delete(user_id=user_id, ds_id=ds.id.value) + + assert result is False + @pytest.mark.asyncio async def test_delete_removes_credentials_if_path_exists( self, service, mock_authz, mock_ds_repo, mock_secret_store, user_id, tenant_id @@ -619,6 +661,18 @@ async def test_trigger_sync_raises_value_error_when_not_found( with pytest.raises(ValueError): await service.trigger_sync(user_id=user_id, ds_id="nonexistent") + @pytest.mark.asyncio + async def test_trigger_sync_rejects_different_tenant( + self, service, mock_authz, mock_ds_repo, user_id + ): + """trigger_sync() raises ValueError when DS belongs to a different tenant.""" + ds = _make_ds(tenant_id="other-tenant") + mock_authz.check_permission.return_value = True + mock_ds_repo.get_by_id.return_value = ds + + with pytest.raises(ValueError): + await service.trigger_sync(user_id=user_id, ds_id=ds.id.value) + @pytest.mark.asyncio async def test_trigger_sync_creates_sync_run_and_saves_ds( self, service, mock_authz, mock_ds_repo, mock_sync_run_repo, mock_probe, user_id diff --git a/src/api/tests/unit/management/application/test_knowledge_graph_service.py b/src/api/tests/unit/management/application/test_knowledge_graph_service.py index 36d9385b..44e0923f 100644 --- a/src/api/tests/unit/management/application/test_knowledge_graph_service.py +++ b/src/api/tests/unit/management/application/test_knowledge_graph_service.py @@ -260,16 +260,29 @@ async def test_get_checks_view_permission( ) @pytest.mark.asyncio - async def test_get_raises_unauthorized_when_permission_denied( + async def test_get_returns_none_for_different_tenant( + self, service, mock_kg_repo, user_id + ): + """get() returns None when KG belongs to a different tenant.""" + kg = _make_kg(tenant_id="other-tenant") + mock_kg_repo.get_by_id.return_value = kg + + result = await service.get(user_id=user_id, kg_id=kg.id.value) + + assert result is None + + @pytest.mark.asyncio + async def test_get_returns_none_when_permission_denied( self, service, mock_authz, mock_kg_repo, user_id ): - """get() raises UnauthorizedError when user lacks VIEW.""" + """get() returns None when user lacks VIEW (no existence leakage).""" kg = _make_kg() mock_kg_repo.get_by_id.return_value = kg mock_authz.check_permission.return_value = False - with pytest.raises(UnauthorizedError): - await service.get(user_id=user_id, kg_id=kg.id.value) + result = await service.get(user_id=user_id, kg_id=kg.id.value) + + assert result is None @pytest.mark.asyncio async def test_get_returns_aggregate_on_success( @@ -448,6 +461,23 @@ async def test_update_raises_value_error_when_not_found( description="Updated desc", ) + @pytest.mark.asyncio + async def test_update_rejects_different_tenant( + self, service, mock_authz, mock_kg_repo, user_id + ): + """update() raises ValueError when KG belongs to a different tenant.""" + kg = _make_kg(tenant_id="other-tenant") + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + + with pytest.raises(ValueError): + await service.update( + user_id=user_id, + kg_id=kg.id.value, + name="Updated", + description="Updated desc", + ) + @pytest.mark.asyncio async def test_update_calls_aggregate_update_and_saves( self, service, mock_authz, mock_kg_repo, mock_probe, user_id @@ -540,6 +570,19 @@ async def test_delete_returns_false_when_not_found( assert result is False + @pytest.mark.asyncio + async def test_delete_returns_false_for_different_tenant( + self, service, mock_authz, mock_kg_repo, user_id + ): + """delete() returns False when KG belongs to a different tenant.""" + kg = _make_kg(tenant_id="other-tenant") + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = kg + + result = await service.delete(user_id=user_id, kg_id=kg.id.value) + + assert result is False + @pytest.mark.asyncio async def test_delete_cascades_data_sources( self, service, mock_authz, mock_kg_repo, mock_ds_repo, user_id, tenant_id diff --git a/src/api/tests/unit/management/test_data_source.py b/src/api/tests/unit/management/test_data_source.py index e56ba6f2..034702d9 100644 --- a/src/api/tests/unit/management/test_data_source.py +++ b/src/api/tests/unit/management/test_data_source.py @@ -11,6 +11,7 @@ from management.domain.events import ( DataSourceCreated, DataSourceDeleted, + DataSourceSyncRequested, DataSourceUpdated, ) from management.domain.exceptions import ( @@ -301,6 +302,53 @@ def test_update_connection_raises_after_deletion(self): ) +class TestDataSourceRequestSync: + """Tests for DataSource.request_sync() method.""" + + def _create_ds(self, **kwargs): + """Helper to create a DataSource and clear creation events.""" + defaults = { + "knowledge_graph_id": "kg-123", + "tenant_id": "tenant-456", + "name": "Source", + "adapter_type": DataSourceAdapterType.GITHUB, + "connection_config": {}, + } + defaults.update(kwargs) + ds = DataSource.create(**defaults) + ds.collect_events() + return ds + + def test_request_sync_emits_sync_requested_event(self): + """request_sync() should emit a DataSourceSyncRequested event.""" + ds = self._create_ds() + ds.request_sync(requested_by="user-abc") + events = ds.collect_events() + assert len(events) == 1 + event = events[0] + assert isinstance(event, DataSourceSyncRequested) + assert event.data_source_id == ds.id.value + assert event.knowledge_graph_id == "kg-123" + assert event.tenant_id == "tenant-456" + assert event.requested_by == "user-abc" + assert event.occurred_at is not None + + def test_request_sync_without_actor(self): + """request_sync() without requested_by should set it to None.""" + ds = self._create_ds() + ds.request_sync() + events = ds.collect_events() + assert events[0].requested_by is None + + def test_request_sync_raises_after_deletion(self): + """request_sync() should raise AggregateDeletedError after mark_for_deletion().""" + ds = self._create_ds() + ds.mark_for_deletion() + ds.collect_events() + with pytest.raises(AggregateDeletedError): + ds.request_sync() + + class TestDataSourceRecordSyncCompleted: """Tests for DataSource.record_sync_completed() method.""" From 5e61e93e7a57db48c04f8fdeb97c65d59140a4ec Mon Sep 17 00:00:00 2001 From: John Sell Date: Tue, 17 Mar 2026 17:12:10 -0400 Subject: [PATCH 06/10] fix(management): fix get() docstrings and move ULID import to module level - Update get() docstrings in both services to document None return on permission denied (removed stale UnauthorizedError raises section) - Move ULID import from local scope to module level in DataSourceService Co-Authored-By: Claude Opus 4.6 --- .../application/services/data_source_service.py | 9 +++------ .../application/services/knowledge_graph_service.py | 6 ++---- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/src/api/management/application/services/data_source_service.py b/src/api/management/application/services/data_source_service.py index edeb7f4c..1dd91de7 100644 --- a/src/api/management/application/services/data_source_service.py +++ b/src/api/management/application/services/data_source_service.py @@ -9,6 +9,7 @@ from datetime import UTC, datetime from sqlalchemy.ext.asyncio import AsyncSession +from ulid import ULID from management.application.observability import ( DataSourceServiceProbe, @@ -191,10 +192,8 @@ async def get( ds_id: The data source ID Returns: - The DataSource aggregate, or None if not found - - Raises: - UnauthorizedError: If user lacks VIEW permission + The DataSource aggregate, or None if not found or if + the caller lacks VIEW permission (to avoid existence leakage) """ ds = await self._ds_repo.get_by_id(DataSourceId(value=ds_id)) if ds is None: @@ -434,8 +433,6 @@ async def trigger_sync( now = datetime.now(UTC) async with self._session.begin(): - from ulid import ULID - sync_run = DataSourceSyncRun( id=str(ULID()), data_source_id=ds.id.value, diff --git a/src/api/management/application/services/knowledge_graph_service.py b/src/api/management/application/services/knowledge_graph_service.py index f00d6fe6..ca187f04 100644 --- a/src/api/management/application/services/knowledge_graph_service.py +++ b/src/api/management/application/services/knowledge_graph_service.py @@ -173,10 +173,8 @@ async def get( kg_id: The knowledge graph ID Returns: - The KnowledgeGraph aggregate, or None if not found - - Raises: - UnauthorizedError: If user lacks VIEW permission + The KnowledgeGraph aggregate, or None if not found or if + the caller lacks VIEW permission (to avoid existence leakage) """ kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) if kg is None: From f0a0b6245c47d48a8dc2e31f893e11ba8f1b51f8 Mon Sep 17 00:00:00 2001 From: John Sell Date: Tue, 17 Mar 2026 17:23:24 -0400 Subject: [PATCH 07/10] fix(management): use explicit None checks in update and add tenant check in list - Use `if x is not None` instead of `x or default` in update_connection to preserve intentional falsy values (e.g., empty dict for connection_config) - Add tenant ownership verification in list_for_knowledge_graph as defense-in-depth, consistent with create() Co-Authored-By: Claude Opus 4.6 --- .../application/services/data_source_service.py | 11 +++++++++-- .../application/test_data_source_service.py | 14 ++++++++++++-- src/api/uv.lock | 2 +- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/api/management/application/services/data_source_service.py b/src/api/management/application/services/data_source_service.py index 1dd91de7..0533c1fc 100644 --- a/src/api/management/application/services/data_source_service.py +++ b/src/api/management/application/services/data_source_service.py @@ -249,6 +249,11 @@ async def list_for_knowledge_graph( f"User {user_id} lacks view permission on knowledge graph {kg_id}" ) + # Verify KG belongs to tenant (defense-in-depth) + kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) + if kg is None or kg.tenant_id != self._scope_to_tenant: + raise UnauthorizedError(f"Knowledge graph {kg_id} not accessible") + data_sources = await self._ds_repo.find_by_knowledge_graph(kg_id) self._probe.data_sources_listed( @@ -309,8 +314,10 @@ async def update( async with self._session.begin(): if name is not None or connection_config is not None: ds.update_connection( - name=name or ds.name, - connection_config=connection_config or ds.connection_config, + name=name if name is not None else ds.name, + connection_config=connection_config + if connection_config is not None + else ds.connection_config, credentials_path=ds.credentials_path, updated_by=user_id, ) diff --git a/src/api/tests/unit/management/application/test_data_source_service.py b/src/api/tests/unit/management/application/test_data_source_service.py index 71360e11..7577d88a 100644 --- a/src/api/tests/unit/management/application/test_data_source_service.py +++ b/src/api/tests/unit/management/application/test_data_source_service.py @@ -370,10 +370,11 @@ class TestDataSourceServiceListForKnowledgeGraph: @pytest.mark.asyncio async def test_list_checks_view_permission_on_kg( - self, service, mock_authz, mock_ds_repo, user_id, kg_id + self, service, mock_authz, mock_ds_repo, mock_kg_repo, user_id, kg_id, tenant_id ): """list_for_knowledge_graph() checks VIEW on the KG.""" mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) mock_ds_repo.find_by_knowledge_graph.return_value = [] await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) @@ -396,10 +397,19 @@ async def test_list_raises_unauthorized_when_denied( @pytest.mark.asyncio async def test_list_returns_data_sources( - self, service, mock_authz, mock_ds_repo, mock_probe, user_id, kg_id + self, + service, + mock_authz, + mock_ds_repo, + mock_kg_repo, + mock_probe, + user_id, + kg_id, + tenant_id, ): """list_for_knowledge_graph() returns data sources from repo.""" mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) ds1 = _make_ds(ds_id="ds-001") ds2 = _make_ds(ds_id="ds-002") mock_ds_repo.find_by_knowledge_graph.return_value = [ds1, ds2] diff --git a/src/api/uv.lock b/src/api/uv.lock index d9a98e23..b085bb5d 100644 --- a/src/api/uv.lock +++ b/src/api/uv.lock @@ -1171,7 +1171,7 @@ wheels = [ [[package]] name = "kartograph-api" -version = "3.29.0" +version = "3.30.0" source = { virtual = "." } dependencies = [ { name = "alembic" }, From b43526d64f5baf1abc68dbe13aed1e91fd30bb29 Mon Sep 17 00:00:00 2001 From: John Sell Date: Tue, 17 Mar 2026 17:28:11 -0400 Subject: [PATCH 08/10] test(management): add tenant scoping test for list_for_knowledge_graph Verify that list_for_knowledge_graph raises UnauthorizedError when the knowledge graph belongs to a different tenant (defense-in-depth check). Co-Authored-By: Claude Opus 4.6 --- .../application/test_data_source_service.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/api/tests/unit/management/application/test_data_source_service.py b/src/api/tests/unit/management/application/test_data_source_service.py index 7577d88a..fff5c8f7 100644 --- a/src/api/tests/unit/management/application/test_data_source_service.py +++ b/src/api/tests/unit/management/application/test_data_source_service.py @@ -395,6 +395,19 @@ async def test_list_raises_unauthorized_when_denied( with pytest.raises(UnauthorizedError): await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + @pytest.mark.asyncio + async def test_list_raises_unauthorized_for_different_tenant_kg( + self, service, mock_authz, mock_kg_repo, user_id, kg_id + ): + """list_for_knowledge_graph() rejects KG belonging to different tenant.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = _make_kg( + kg_id=kg_id, tenant_id="other-tenant" + ) + + with pytest.raises(UnauthorizedError, match="not accessible"): + await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + @pytest.mark.asyncio async def test_list_returns_data_sources( self, From 539e9be7feead59d29071da56ed7fd00d5b4b35f Mon Sep 17 00:00:00 2001 From: John Sell Date: Tue, 17 Mar 2026 17:39:07 -0400 Subject: [PATCH 09/10] test(management): add KG not found test for list_for_knowledge_graph Verify that list_for_knowledge_graph raises UnauthorizedError (not ValueError) when the KG doesn't exist, preventing existence leakage. Co-Authored-By: Claude Opus 4.6 --- .../application/test_data_source_service.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/api/tests/unit/management/application/test_data_source_service.py b/src/api/tests/unit/management/application/test_data_source_service.py index fff5c8f7..e86a4302 100644 --- a/src/api/tests/unit/management/application/test_data_source_service.py +++ b/src/api/tests/unit/management/application/test_data_source_service.py @@ -395,6 +395,17 @@ async def test_list_raises_unauthorized_when_denied( with pytest.raises(UnauthorizedError): await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + @pytest.mark.asyncio + async def test_list_raises_unauthorized_when_kg_not_found( + self, service, mock_authz, mock_kg_repo, user_id, kg_id + ): + """list_for_knowledge_graph() raises UnauthorizedError when KG not found.""" + mock_authz.check_permission.return_value = True + mock_kg_repo.get_by_id.return_value = None + + with pytest.raises(UnauthorizedError, match="not accessible"): + await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) + @pytest.mark.asyncio async def test_list_raises_unauthorized_for_different_tenant_kg( self, service, mock_authz, mock_kg_repo, user_id, kg_id From 090e0349dc6f4338a5c4bf28ca43b70de1f4943e Mon Sep 17 00:00:00 2001 From: John Sell Date: Wed, 18 Mar 2026 10:00:24 -0400 Subject: [PATCH 10/10] test(management): assert tenant_id and credentials in secret store calls Verify that create() and update() pass the correct tenant_id and credentials to the secret store, not just the path prefix. Co-Authored-By: Claude Opus 4.6 --- .../application/test_data_source_service.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/api/tests/unit/management/application/test_data_source_service.py b/src/api/tests/unit/management/application/test_data_source_service.py index e86a4302..e1d044f3 100644 --- a/src/api/tests/unit/management/application/test_data_source_service.py +++ b/src/api/tests/unit/management/application/test_data_source_service.py @@ -259,10 +259,10 @@ async def test_create_stores_credentials_when_provided( ) mock_secret_store.store.assert_called_once() - call_kwargs = mock_secret_store.store.call_args - assert "datasource/" in call_kwargs.kwargs.get("path", "") or "datasource/" in ( - call_kwargs.args[0] if call_kwargs.args else "" - ) + call_kwargs = mock_secret_store.store.call_args.kwargs + assert "datasource/" in call_kwargs.get("path", "") + assert call_kwargs.get("tenant_id") == tenant_id + assert call_kwargs.get("credentials") == creds @pytest.mark.asyncio async def test_create_probes_success( @@ -537,6 +537,10 @@ async def test_update_stores_credentials_when_provided( ) mock_secret_store.store.assert_called_once() + call_kwargs = mock_secret_store.store.call_args.kwargs + assert "datasource/" in call_kwargs.get("path", "") + assert call_kwargs.get("tenant_id") == tenant_id + assert call_kwargs.get("credentials") == creds @pytest.mark.asyncio async def test_update_probes_success(