Skip to content

feat(management): add application services with SpiceDB authorization (AIHCM-183)#299

Merged
jsell-rh merged 11 commits intomainfrom
feature/AIHCM-183
Mar 18, 2026
Merged

feat(management): add application services with SpiceDB authorization (AIHCM-183)#299
jsell-rh merged 11 commits intomainfrom
feature/AIHCM-183

Conversation

@jsell-rh
Copy link
Collaborator

@jsell-rh jsell-rh commented Mar 17, 2026

Summary

  • Add KnowledgeGraphService and DataSourceService with SpiceDB authorization on all CRUD operations
  • Permission checks aligned with SpiceDB schema: workspace:edit for KG create, knowledge_graph:view/edit/manage for KG ops, data_source:view/edit/manage for DS ops
  • KG delete cascades: deletes all child DataSources (credentials + records) before KG
  • Credential storage: creates DS aggregate first, stores credentials using generated ID path, sets credentials_path — all in same transaction
  • trigger_sync creates DataSourceSyncRun (status=pending) and emits DataSourceSyncRequested event via new DataSource.request_sync() domain method
  • Tenant scoping enforced on all operations (cross-tenant access returns None/False)
  • get() returns None for both not-found and access-denied (prevents resource existence leakage)
  • DI factory functions wiring repos, outbox, secret store, authz, and tenant scoping
  • Application-level observability probes (Protocol + Default) for both services

Test plan

  • 60 new unit tests covering all CRUD + trigger_sync operations, permission checks, tenant scoping, existence leakage prevention, cascade delete, credential storage
  • 293 total management unit tests pass, zero regressions
  • SpiceDB permission checks verified against schema.zed for all 11 operations

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Data source lifecycle: create, retrieve, list, update, delete, and trigger sync (emits domain sync events).
    • Knowledge graph lifecycle: create, retrieve, list, update, delete (optional cascade to related data).
    • Tenant-scoped authorization enforced on all operations.
    • Observability probes for data source and knowledge graph events with contextual binding.
    • FastAPI dependency providers to wire management services.
  • Tests

    • Comprehensive unit tests covering services and domain sync event behavior.
  • Chores

    • Added UnauthorizedError type for authorization failures.

jsell-rh and others added 5 commits March 17, 2026 16:39
Add KnowledgeGraphServiceProbe and DataSourceServiceProbe following
the Domain-Oriented Observability pattern established in IAM.
Each includes a Protocol interface and DefaultXxxProbe implementation
using structlog with _get_context_kwargs for kwarg collision avoidance.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add KnowledgeGraphService application service implementing CRUD
operations with SpiceDB permission checks (EDIT, VIEW, MANAGE).
Includes cascade delete of DataSources, read_relationships for
workspace-scoped listing, and IntegrityError-to-domain exception
mapping. Also adds UnauthorizedError to management ports.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add DataSourceService application service implementing CRUD + trigger_sync
operations with SpiceDB permission checks. Handles credential storage via
ISecretStoreRepository, KG existence/tenant validation, sync run creation
with DataSourceSyncRequested event emission.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add FastAPI dependency injection factories for KnowledgeGraphService
and DataSourceService, wiring repositories, secret store, authz, and
tenant scoping from CurrentUser. Update architecture test to exclude
management.dependencies from cross-context isolation checks since
DI wiring is a presentation-layer concern.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…sulate sync request

Security fixes:
- Add tenant_id verification in KG get/update/delete methods
- Add tenant_id verification in DS get/update/delete/trigger_sync methods
- Return None on permission denied in get() to prevent existence leakage

DDD fix:
- Add DataSource.request_sync() domain method instead of directly
  accessing _pending_events from the application service

Tests:
- Add tenant scoping unit tests for all affected methods
- Add get() returns None on permission denied tests
- Add DataSource.request_sync() unit tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 17, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds a Management application layer: two application services (DataSourceService, KnowledgeGraphService) with tenant scoping, authorization checks, transactional persistence, secret-store credential handling, and observability probes. Introduces observability probes and default implementations, FastAPI dependency providers that wire services, a DataSource.request_sync domain method emitting a sync event, an UnauthorizedError exception, and extensive unit tests for services and domain behavior.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant DataSourceService
    participant AuthzProvider
    participant KnowledgeGraphRepo
    participant DataSourceRepo
    participant SecretStore
    participant Probe

    User->>DataSourceService: create(user_id, kg_id, name, config, credentials)
    DataSourceService->>AuthzProvider: check EDIT permission on KG
    alt Permission Denied
        AuthzProvider-->>DataSourceService: denied
        DataSourceService->>Probe: permission_denied(user_id, kg_id, EDIT)
        DataSourceService-->>User: UnauthorizedError
    else Permission Granted
        AuthzProvider-->>DataSourceService: allowed
        DataSourceService->>KnowledgeGraphRepo: verify KG exists & tenant
        DataSourceService->>DataSourceRepo: create DataSource aggregate
        alt Raw Credentials Provided
            DataSourceService->>SecretStore: store encrypted credentials
        end
        DataSourceService->>DataSourceRepo: persist DataSource
        DataSourceService->>Probe: data_source_created(ds_id, kg_id, tenant_id, name)
        DataSourceService-->>User: DataSource
    end
Loading
sequenceDiagram
    participant User
    participant DataSourceService
    participant AuthzProvider
    participant DataSourceRepo
    participant SecretStore
    participant Probe

    User->>DataSourceService: delete(user_id, ds_id)
    DataSourceService->>AuthzProvider: check MANAGE permission on DS
    alt Permission Denied
        AuthzProvider-->>DataSourceService: denied
        DataSourceService->>Probe: permission_denied(user_id, ds_id, MANAGE)
        DataSourceService-->>User: UnauthorizedError
    else Permission Granted
        AuthzProvider-->>DataSourceService: allowed
        DataSourceService->>DataSourceRepo: fetch DataSource
        DataSourceService->>DataSourceRepo: validate tenant
        alt Credentials Stored
            DataSourceService->>SecretStore: delete credentials
        end
        DataSourceService->>DataSourceRepo: mark & persist deletion
        DataSourceService->>Probe: data_source_deleted(ds_id)
        DataSourceService-->>User: success
    end
Loading
sequenceDiagram
    participant User
    participant KnowledgeGraphService
    participant AuthzProvider
    participant KnowledgeGraphRepo
    participant DataSourceRepo
    participant Probe

    User->>KnowledgeGraphService: delete(user_id, kg_id)
    KnowledgeGraphService->>AuthzProvider: check MANAGE permission on KG
    alt Permission Denied
        AuthzProvider-->>KnowledgeGraphService: denied
        KnowledgeGraphService->>Probe: permission_denied(user_id, kg_id, MANAGE)
        KnowledgeGraphService-->>User: UnauthorizedError
    else Permission Granted
        AuthzProvider-->>KnowledgeGraphService: allowed
        KnowledgeGraphService->>KnowledgeGraphRepo: fetch & validate tenant
        alt DataSourceRepository available
            KnowledgeGraphService->>DataSourceRepo: list related data sources
            loop For each data source
                KnowledgeGraphService->>DataSourceRepo: mark & delete data source
            end
        end
        KnowledgeGraphService->>KnowledgeGraphRepo: delete knowledge graph
        KnowledgeGraphService->>Probe: knowledge_graph_deleted(kg_id)
        KnowledgeGraphService-->>User: success
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: adding application services with SpiceDB authorization to the management context, referenced by ticket AIHCM-183.
Docstring Coverage ✅ Passed Docstring coverage is 84.55% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/AIHCM-183
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (4)
src/api/management/ports/exceptions.py (1)

31-39: Keep UnauthorizedError out of the port-exception module.

This file is documented as repository/port failures handled by the application layer, but UnauthorizedError is an application/API concern and its docstring now mentions HTTP 403. Moving it to an application/common exceptions module would keep the boundary clear.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/ports/exceptions.py` around lines 31 - 39,
UnauthorizedError is an application-level/API concern and should be removed from
the port-level exceptions; delete the UnauthorizedError class from this port
exceptions module and add it to the application/common exceptions module (create
or update a common exceptions file) with its HTTP-403-focused docstring. Update
any imports referencing UnauthorizedError to point to the new application/common
exceptions module, and ensure the original port exceptions module only contains
repository/port-related exception classes.
src/api/management/application/observability/data_source_service_probe.py (1)

33-38: Prefer stable failure codes over raw error strings.

These probe methods accept arbitrary strings and log them verbatim. On create/delete paths that now touch persistence and the secret store, that makes it too easy to push backend details into logs and it creates high-cardinality telemetry. Prefer structured fields like error_code / error_type, and capture stack traces at the catch site only when needed. The same contract exists in src/api/management/application/observability/knowledge_graph_service_probe.py.

Also applies to: 64-68, 155-169, 209-221

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/application/observability/data_source_service_probe.py`
around lines 33 - 38, The probe methods (e.g., data_source_creation_failed)
currently accept raw error strings; change their signature to accept a stable
structured identifier (e.g., error_code: str or error_type: str) and an optional
sanitized message flag instead of raw error text, update the body to log only
the structured field and any safe, low-cardinality metadata, and remove/avoid
logging stack traces here; apply the same change to the other probe methods
mentioned (the similar create/delete/secret-store failure methods referenced in
this file and the corresponding methods in knowledge_graph_service_probe.py),
and update all callers/tests to pass an error_code (and only capture full
stacktrace at the catch site if needed).
src/api/management/application/services/data_source_service.py (1)

436-448: Move ULID import to module level.

The from ulid import ULID import on Line 437 is placed inside the function body. This should be moved to the top of the file with other imports for consistency and slight performance benefit (avoiding repeated import lookups).

♻️ Proposed fix

Add at the top of the file with other imports:

from ulid import ULID

Then remove Line 437 and update usage:

         async with self._session.begin():
-            from ulid import ULID
-
             sync_run = DataSourceSyncRun(
                 id=str(ULID()),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/application/services/data_source_service.py` around lines
436 - 448, Move the local import "from ulid import ULID" out of the function and
add it to the module-level imports; remove the in-function import inside the
async with block that creates DataSourceSyncRun and keep using ULID() when
building the DataSourceSyncRun (id=str(ULID())) before calling
self._sync_run_repo.save(sync_run) so behavior is unchanged but the import is at
the top of the file.
src/api/management/application/services/knowledge_graph_service.py (1)

254-259: Consider batch fetching KGs to avoid N+1 queries.

The current implementation fetches each KG individually in a loop. For workspaces with many knowledge graphs, this could result in performance degradation. Consider adding a batch fetch method to the repository if this becomes a concern in production.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/application/services/knowledge_graph_service.py` around
lines 254 - 259, The loop that calls
self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) for each kg_id causes N+1
queries; add and use a batch fetch on the repository (e.g., get_by_ids or
get_many) to retrieve all KnowledgeGraph objects in one call, convert the kg_ids
into the appropriate KnowledgeGraphId list, then filter the returned list by
tenant_id == self._scope_to_tenant and assign to kgs instead of appending in a
loop; update the repository interface and usages of _kg_repo.get_by_id
accordingly (keep KnowledgeGraphId and _scope_to_tenant as the identity/tenant
checks).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/api/management/application/services/data_source_service.py`:
- Around line 194-198: The docstring for DataSourceService.get is inconsistent:
it claims it raises UnauthorizedError but the implementation returns None to
avoid leaking existence; update the docstring (for the method named get on
DataSourceService) to reflect that it returns None when the caller lacks
permission or the resource doesn't exist and remove or replace the "Raises:
UnauthorizedError" section accordingly, mirroring the style used in
KnowledgeGraphService.get.

In `@src/api/management/application/services/knowledge_graph_service.py`:
- Around line 176-179: Update the docstring on the method in
KnowledgeGraphService that currently claims it raises UnauthorizedError: change
the "Raises" section to instead state that the method returns None when the
caller lacks VIEW permission (to avoid existence leakage) and clarify the return
contract as "KnowledgeGraph aggregate, or None if not found or if the caller
lacks VIEW permission"; reference the UnauthorizedError symbol and the method in
KnowledgeGraphService so the implementer updates the docstring to match the
existing permission-check behavior that returns None.

In `@src/api/management/domain/aggregates/data_source.py`:
- Around line 215-236: The DataSourceSyncRequested event emitted by request_sync
lacks the sync-run identifier which prevents consumers from correlating
overlapping runs; update the event payload and call site to include sync_run_id:
extend the DataSourceSyncRequested event definition to accept sync_run_id,
change the request_sync method in the DataSource aggregate to obtain the
currently pending DataSourceSyncRun id (or accept sync_run_id as an explicit
parameter) and include it when appending the DataSourceSyncRequested event, and
then propagate this change through the trigger_sync flow, the service that
invokes request_sync, and affected tests to assert the sync_run_id is present.

---

Nitpick comments:
In `@src/api/management/application/observability/data_source_service_probe.py`:
- Around line 33-38: The probe methods (e.g., data_source_creation_failed)
currently accept raw error strings; change their signature to accept a stable
structured identifier (e.g., error_code: str or error_type: str) and an optional
sanitized message flag instead of raw error text, update the body to log only
the structured field and any safe, low-cardinality metadata, and remove/avoid
logging stack traces here; apply the same change to the other probe methods
mentioned (the similar create/delete/secret-store failure methods referenced in
this file and the corresponding methods in knowledge_graph_service_probe.py),
and update all callers/tests to pass an error_code (and only capture full
stacktrace at the catch site if needed).

In `@src/api/management/application/services/data_source_service.py`:
- Around line 436-448: Move the local import "from ulid import ULID" out of the
function and add it to the module-level imports; remove the in-function import
inside the async with block that creates DataSourceSyncRun and keep using ULID()
when building the DataSourceSyncRun (id=str(ULID())) before calling
self._sync_run_repo.save(sync_run) so behavior is unchanged but the import is at
the top of the file.

In `@src/api/management/application/services/knowledge_graph_service.py`:
- Around line 254-259: The loop that calls
self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) for each kg_id causes N+1
queries; add and use a batch fetch on the repository (e.g., get_by_ids or
get_many) to retrieve all KnowledgeGraph objects in one call, convert the kg_ids
into the appropriate KnowledgeGraphId list, then filter the returned list by
tenant_id == self._scope_to_tenant and assign to kgs instead of appending in a
loop; update the repository interface and usages of _kg_repo.get_by_id
accordingly (keep KnowledgeGraphId and _scope_to_tenant as the identity/tenant
checks).

In `@src/api/management/ports/exceptions.py`:
- Around line 31-39: UnauthorizedError is an application-level/API concern and
should be removed from the port-level exceptions; delete the UnauthorizedError
class from this port exceptions module and add it to the application/common
exceptions module (create or update a common exceptions file) with its
HTTP-403-focused docstring. Update any imports referencing UnauthorizedError to
point to the new application/common exceptions module, and ensure the original
port exceptions module only contains repository/port-related exception classes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: da219060-93a1-45dc-8842-854445b3d163

📥 Commits

Reviewing files that changed from the base of the PR and between d06aaaf and b5613b3.

📒 Files selected for processing (17)
  • src/api/management/application/__init__.py
  • src/api/management/application/observability/__init__.py
  • src/api/management/application/observability/data_source_service_probe.py
  • src/api/management/application/observability/knowledge_graph_service_probe.py
  • src/api/management/application/services/__init__.py
  • src/api/management/application/services/data_source_service.py
  • src/api/management/application/services/knowledge_graph_service.py
  • src/api/management/dependencies/__init__.py
  • src/api/management/dependencies/data_source.py
  • src/api/management/dependencies/knowledge_graph.py
  • src/api/management/domain/aggregates/data_source.py
  • src/api/management/ports/exceptions.py
  • src/api/tests/unit/management/application/__init__.py
  • src/api/tests/unit/management/application/test_data_source_service.py
  • src/api/tests/unit/management/application/test_knowledge_graph_service.py
  • src/api/tests/unit/management/test_architecture.py
  • src/api/tests/unit/management/test_data_source.py

Comment on lines +215 to +236
def request_sync(self, *, requested_by: str | None = None) -> None:
"""Request a sync for this data source.

Records a DataSourceSyncRequested event.

Args:
requested_by: The user who requested the sync (optional)

Raises:
AggregateDeletedError: If the data source has been marked for deletion
"""
if self._deleted:
raise AggregateDeletedError("Cannot request sync on a deleted data source")
self._pending_events.append(
DataSourceSyncRequested(
data_source_id=self.id.value,
knowledge_graph_id=self.knowledge_graph_id,
tenant_id=self.tenant_id,
occurred_at=datetime.now(UTC),
requested_by=requested_by,
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Include the sync-run identifier in the emitted event.

Per the new trigger_sync flow, a pending DataSourceSyncRun exists before this event is raised, but the payload only carries data_source_id. If two requests for the same source overlap, downstream consumers cannot tell which run they should reconcile. Add sync_run_id to DataSourceSyncRequested, or enforce a one-pending-run invariant before emitting it.

Possible direction
-    def request_sync(self, *, requested_by: str | None = None) -> None:
+    def request_sync(
+        self,
+        *,
+        sync_run_id: str,
+        requested_by: str | None = None,
+    ) -> None:
         """Request a sync for this data source."""
         if self._deleted:
             raise AggregateDeletedError("Cannot request sync on a deleted data source")
         self._pending_events.append(
             DataSourceSyncRequested(
                 data_source_id=self.id.value,
+                sync_run_id=sync_run_id,
                 knowledge_graph_id=self.knowledge_graph_id,
                 tenant_id=self.tenant_id,
                 occurred_at=datetime.now(UTC),
                 requested_by=requested_by,
             )
         )

This also needs to be threaded through the event definition, service call site, and tests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/domain/aggregates/data_source.py` around lines 215 - 236,
The DataSourceSyncRequested event emitted by request_sync lacks the sync-run
identifier which prevents consumers from correlating overlapping runs; update
the event payload and call site to include sync_run_id: extend the
DataSourceSyncRequested event definition to accept sync_run_id, change the
request_sync method in the DataSource aggregate to obtain the currently pending
DataSourceSyncRun id (or accept sync_run_id as an explicit parameter) and
include it when appending the DataSourceSyncRequested event, and then propagate
this change through the trigger_sync flow, the service that invokes
request_sync, and affected tests to assert the sync_run_id is present.

…level

- Update get() docstrings in both services to document None return on
  permission denied (removed stale UnauthorizedError raises section)
- Move ULID import from local scope to module level in DataSourceService

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
src/api/management/application/services/knowledge_graph_service.py (1)

254-257: Consider batch fetching to avoid N+1 queries.

The loop fetches each KnowledgeGraph individually, resulting in N database queries. For workspaces with many knowledge graphs, consider adding a batch method like get_by_ids() to the repository.

♻️ Suggested approach
-        # Fetch each KG from repo and filter by tenant
-        kgs: list[KnowledgeGraph] = []
-        for kg_id in kg_ids:
-            kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id))
-            if kg is not None and kg.tenant_id == self._scope_to_tenant:
-                kgs.append(kg)
+        # Batch fetch all KGs and filter by tenant
+        all_kgs = await self._kg_repo.get_by_ids(
+            [KnowledgeGraphId(value=kg_id) for kg_id in kg_ids]
+        )
+        kgs = [kg for kg in all_kgs if kg.tenant_id == self._scope_to_tenant]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/application/services/knowledge_graph_service.py` around
lines 254 - 257, The current loop over kg_ids calls
self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) for each id causing N+1
queries; add a batch repository method (e.g., get_by_ids(ids:
List[KnowledgeGraphId]) or get_by_ids_raw(ids: List[str])) on the KG repo and
replace the loop with a single call that returns all KGs, then filter returned
items by tenant_id == self._scope_to_tenant and extend kgs. Ensure you use the
existing KnowledgeGraphId type when constructing ids for the batch call and
update any callers/tests accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/api/management/application/services/data_source_service.py`:
- Around line 153-172: The secret-store calls (self._secret_store.store and
delete) are executed inside the DB transaction begun with async with
self._session.begin(), risking divergence if DB commit fails; refactor so the DB
save (self._ds_repo.save / DataSource.create flow) completes and the transaction
is committed first, then perform secret-store.store/delete afterwards and, on
secret-store failure, apply a compensating DB update or enqueue an outbox
message to reconcile; locate usages around the async with self._session.begin()
block that creates/saves DataSource and move credential persistence out of that
block, ensuring ds.credentials_path is updated only after successful secret
storage or handled via compensating action/outbox if storage fails.
- Around line 310-314: The call to ds.update_connection uses truthy fallbacks
that ignore intentional falsy values; change the arguments to use explicit None
checks so empty-but-valid values are passed through—e.g., replace name=name or
ds.name with name=name if name is not None else ds.name and
connection_config=connection_config if connection_config is not None else
ds.connection_config (leave credentials_path as-is), and keep the outer
conditional that checks whether name is not None or connection_config is not
None.
- Line 252: Add an explicit tenant ownership check in list_for_knowledge_graph
before calling self._ds_repo.find_by_knowledge_graph(kg_id): load the KG via
self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) and if it is None or
kg.tenant_id != self._scope_to_tenant raise UnauthorizedError; only then call
_ds_repo.find_by_knowledge_graph to ensure the knowledge graph belongs to the
current tenant (defense-in-depth consistent with create()).

---

Nitpick comments:
In `@src/api/management/application/services/knowledge_graph_service.py`:
- Around line 254-257: The current loop over kg_ids calls
self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) for each id causing N+1
queries; add a batch repository method (e.g., get_by_ids(ids:
List[KnowledgeGraphId]) or get_by_ids_raw(ids: List[str])) on the KG repo and
replace the loop with a single call that returns all KGs, then filter returned
items by tenant_id == self._scope_to_tenant and extend kgs. Ensure you use the
existing KnowledgeGraphId type when constructing ids for the batch call and
update any callers/tests accordingly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2c1e79f1-9b95-4a87-8a82-7d98a58c7c58

📥 Commits

Reviewing files that changed from the base of the PR and between b5613b3 and 5e61e93.

📒 Files selected for processing (2)
  • src/api/management/application/services/data_source_service.py
  • src/api/management/application/services/knowledge_graph_service.py

Comment on lines +153 to +172
async with self._session.begin():
ds = DataSource.create(
knowledge_graph_id=kg_id,
tenant_id=self._scope_to_tenant,
name=name,
adapter_type=adapter_type,
connection_config=connection_config,
created_by=user_id,
)

if raw_credentials is not None:
cred_path = f"datasource/{ds.id.value}/credentials"
await self._secret_store.store(
path=cred_path,
tenant_id=self._scope_to_tenant,
credentials=raw_credentials,
)
ds.credentials_path = cred_path

await self._ds_repo.save(ds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Non-atomic secret-store side effects can desynchronize credentials from DB state.

self._secret_store.store/delete(...) is executed inside async with self._session.begin() (Line 165, Line 320, Line 379). If DB commit fails after a secret operation succeeds, DB and secret store diverge (orphaned/deleted credentials with opposite DB state). This is a critical consistency risk.

Please isolate secret-store side effects from DB transaction boundaries (e.g., post-commit with compensating actions, or transactional outbox workflow).

Also applies to: 318-327, 377-386

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/application/services/data_source_service.py` around lines
153 - 172, The secret-store calls (self._secret_store.store and delete) are
executed inside the DB transaction begun with async with self._session.begin(),
risking divergence if DB commit fails; refactor so the DB save
(self._ds_repo.save / DataSource.create flow) completes and the transaction is
committed first, then perform secret-store.store/delete afterwards and, on
secret-store failure, apply a compensating DB update or enqueue an outbox
message to reconcile; locate usages around the async with self._session.begin()
block that creates/saves DataSource and move credential persistence out of that
block, ensuring ds.credentials_path is updated only after successful secret
storage or handled via compensating action/outbox if storage fails.

…eck in list

- Use `if x is not None` instead of `x or default` in update_connection
  to preserve intentional falsy values (e.g., empty dict for connection_config)
- Add tenant ownership verification in list_for_knowledge_graph as
  defense-in-depth, consistent with create()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/api/tests/unit/management/application/test_data_source_service.py (1)

368-424: Consider adding test for cross-tenant KG rejection in list_for_knowledge_graph.

The service now includes a defense-in-depth tenant check (lines 252-255 in the service), but there's no test verifying UnauthorizedError is raised when the KG belongs to a different tenant after permission check passes.

💡 Suggested test to add
`@pytest.mark.asyncio`
async def test_list_raises_unauthorized_for_different_tenant_kg(
    self, service, mock_authz, mock_kg_repo, user_id, kg_id
):
    """list_for_knowledge_graph() raises UnauthorizedError when KG belongs to different tenant."""
    mock_authz.check_permission.return_value = True
    mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id="other-tenant")

    with pytest.raises(UnauthorizedError, match="not accessible"):
        await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/tests/unit/management/application/test_data_source_service.py` around
lines 368 - 424, Add a unit test to verify list_for_knowledge_graph enforces the
defense-in-depth tenant check: create a pytest async test (e.g.,
test_list_raises_unauthorized_for_different_tenant_kg) that sets
mock_authz.check_permission.return_value = True, sets
mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id,
tenant_id="other-tenant"), then asserts that awaiting
service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) raises
UnauthorizedError (optionally matching "not accessible"); ensure the test
references list_for_knowledge_graph, mock_authz, mock_kg_repo and _make_kg so it
fails if the tenant rejection is not enforced.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/api/tests/unit/management/application/test_data_source_service.py`:
- Around line 368-424: Add a unit test to verify list_for_knowledge_graph
enforces the defense-in-depth tenant check: create a pytest async test (e.g.,
test_list_raises_unauthorized_for_different_tenant_kg) that sets
mock_authz.check_permission.return_value = True, sets
mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id,
tenant_id="other-tenant"), then asserts that awaiting
service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) raises
UnauthorizedError (optionally matching "not accessible"); ensure the test
references list_for_knowledge_graph, mock_authz, mock_kg_repo and _make_kg so it
fails if the tenant rejection is not enforced.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5b121947-39cd-4187-80c1-92b3da94a178

📥 Commits

Reviewing files that changed from the base of the PR and between 5e61e93 and f0a0b62.

⛔ Files ignored due to path filters (1)
  • src/api/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (2)
  • src/api/management/application/services/data_source_service.py
  • src/api/tests/unit/management/application/test_data_source_service.py

Verify that list_for_knowledge_graph raises UnauthorizedError when the
knowledge graph belongs to a different tenant (defense-in-depth check).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/api/tests/unit/management/application/test_data_source_service.py`:
- Around line 368-437: Add a test to TestDataSourceServiceListForKnowledgeGraph
covering the "KG not found" case: ensure mock_authz.check_permission returns
True and mock_kg_repo.get_by_id returns None, then call
DataSourceService.list_for_knowledge_graph and assert it raises a ValueError
(match "not found"); reference the existing pattern used in other tests (e.g.,
test_list_raises_unauthorized_for_different_tenant_kg) and use the same fixtures
(service, mock_authz, mock_kg_repo, user_id, kg_id).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b43ede58-aa59-4eb4-ac2c-8a634c66e8ac

📥 Commits

Reviewing files that changed from the base of the PR and between f0a0b62 and b43526d.

📒 Files selected for processing (1)
  • src/api/tests/unit/management/application/test_data_source_service.py

Comment on lines +368 to +437
class TestDataSourceServiceListForKnowledgeGraph:
"""Tests for DataSourceService.list_for_knowledge_graph."""

@pytest.mark.asyncio
async def test_list_checks_view_permission_on_kg(
self, service, mock_authz, mock_ds_repo, mock_kg_repo, user_id, kg_id, tenant_id
):
"""list_for_knowledge_graph() checks VIEW on the KG."""
mock_authz.check_permission.return_value = True
mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id)
mock_ds_repo.find_by_knowledge_graph.return_value = []

await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id)

mock_authz.check_permission.assert_called_once_with(
resource=f"knowledge_graph:{kg_id}",
permission=Permission.VIEW,
subject=f"user:{user_id}",
)

@pytest.mark.asyncio
async def test_list_raises_unauthorized_when_denied(
self, service, mock_authz, user_id, kg_id
):
"""list_for_knowledge_graph() raises UnauthorizedError when denied."""
mock_authz.check_permission.return_value = False

with pytest.raises(UnauthorizedError):
await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id)

@pytest.mark.asyncio
async def test_list_raises_unauthorized_for_different_tenant_kg(
self, service, mock_authz, mock_kg_repo, user_id, kg_id
):
"""list_for_knowledge_graph() rejects KG belonging to different tenant."""
mock_authz.check_permission.return_value = True
mock_kg_repo.get_by_id.return_value = _make_kg(
kg_id=kg_id, tenant_id="other-tenant"
)

with pytest.raises(UnauthorizedError, match="not accessible"):
await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id)

@pytest.mark.asyncio
async def test_list_returns_data_sources(
self,
service,
mock_authz,
mock_ds_repo,
mock_kg_repo,
mock_probe,
user_id,
kg_id,
tenant_id,
):
"""list_for_knowledge_graph() returns data sources from repo."""
mock_authz.check_permission.return_value = True
mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id)
ds1 = _make_ds(ds_id="ds-001")
ds2 = _make_ds(ds_id="ds-002")
mock_ds_repo.find_by_knowledge_graph.return_value = [ds1, ds2]

result = await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id)

assert len(result) == 2
mock_probe.data_sources_listed.assert_called_once_with(
kg_id=kg_id,
count=2,
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing test case: KG not found scenario.

The test class covers authorization denial and cross-tenant rejection, but doesn't test the case where the knowledge graph doesn't exist at all (kg_repo.get_by_id returns None). The create tests have this coverage (test_create_verifies_kg_exists_and_belongs_to_tenant), but it's missing here.

Consider adding a test like:

`@pytest.mark.asyncio`
async def test_list_raises_value_error_when_kg_not_found(
    self, service, mock_authz, mock_kg_repo, user_id, kg_id
):
    """list_for_knowledge_graph() raises ValueError when KG not found."""
    mock_authz.check_permission.return_value = True
    mock_kg_repo.get_by_id.return_value = None

    with pytest.raises(ValueError, match="not found"):
        await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/tests/unit/management/application/test_data_source_service.py` around
lines 368 - 437, Add a test to TestDataSourceServiceListForKnowledgeGraph
covering the "KG not found" case: ensure mock_authz.check_permission returns
True and mock_kg_repo.get_by_id returns None, then call
DataSourceService.list_for_knowledge_graph and assert it raises a ValueError
(match "not found"); reference the existing pattern used in other tests (e.g.,
test_list_raises_unauthorized_for_different_tenant_kg) and use the same fixtures
(service, mock_authz, mock_kg_repo, user_id, kg_id).

Verify that list_for_knowledge_graph raises UnauthorizedError (not
ValueError) when the KG doesn't exist, preventing existence leakage.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/api/tests/unit/management/application/test_data_source_service.py (1)

236-266: Strengthen secret-store write assertions with tenant scoping checks

These tests verify that credentials are stored, but they don’t assert that the tenant_id is passed to secret_store.store. Adding that assertion in both create/update credential tests would better protect tenant-isolation guarantees from regressions.

Also applies to: 524-540

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/tests/unit/management/application/test_data_source_service.py` around
lines 236 - 266, Add assertions to the existing tests (e.g.,
test_create_stores_credentials_when_provided and the corresponding update test)
to verify the tenant scoping is passed to the secret store: after calling
service.create/service.update, inspect mock_secret_store.store.call_args and
assert that the tenant_id appears in the call (either in kwargs like
call_args.kwargs.get("tenant_id") or in args) in addition to the existing
"datasource/" path check; reference mock_secret_store.store and the test
function names to locate where to add the extra assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/api/tests/unit/management/application/test_data_source_service.py`:
- Around line 236-266: Add assertions to the existing tests (e.g.,
test_create_stores_credentials_when_provided and the corresponding update test)
to verify the tenant scoping is passed to the secret store: after calling
service.create/service.update, inspect mock_secret_store.store.call_args and
assert that the tenant_id appears in the call (either in kwargs like
call_args.kwargs.get("tenant_id") or in args) in addition to the existing
"datasource/" path check; reference mock_secret_store.store and the test
function names to locate where to add the extra assertion.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3147f813-26d0-4a1b-af35-d8724b73ff19

📥 Commits

Reviewing files that changed from the base of the PR and between b43526d and 539e9be.

📒 Files selected for processing (1)
  • src/api/tests/unit/management/application/test_data_source_service.py

Verify that create() and update() pass the correct tenant_id and
credentials to the secret store, not just the path prefix.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@jsell-rh jsell-rh merged commit 9e9de5e into main Mar 18, 2026
10 checks passed
@jsell-rh jsell-rh deleted the feature/AIHCM-183 branch March 18, 2026 14:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant