Skip to content

feat(management): add FastAPI routes and dependency injection (AIHCM-185)#303

Open
jsell-rh wants to merge 9 commits intomainfrom
feature/AIHCM-185
Open

feat(management): add FastAPI routes and dependency injection (AIHCM-185)#303
jsell-rh wants to merge 9 commits intomainfrom
feature/AIHCM-185

Conversation

@jsell-rh
Copy link
Collaborator

@jsell-rh jsell-rh commented Mar 18, 2026

Summary

Completes the Management context walking skeleton with REST API endpoints for KnowledgeGraph and DataSource CRUD.

Endpoints

Method Path Description Status
POST /management/workspaces/{ws_id}/knowledge-graphs Create KG 201
GET /management/workspaces/{ws_id}/knowledge-graphs List KGs (paginated) 200
GET /management/knowledge-graphs/{kg_id} Get KG 200
PATCH /management/knowledge-graphs/{kg_id} Update KG (partial) 200
DELETE /management/knowledge-graphs/{kg_id} Delete KG (cascade) 204
POST /management/knowledge-graphs/{kg_id}/data-sources Create DS 201
GET /management/knowledge-graphs/{kg_id}/data-sources List DSes (paginated) 200
GET /management/data-sources/{ds_id} Get DS 200
PATCH /management/data-sources/{ds_id} Update DS (partial) 200
DELETE /management/data-sources/{ds_id} Delete DS 204
POST /management/data-sources/{ds_id}/sync Trigger sync 202

Pre-work fixes included

  • DataSourceService.create/update() now catches IntegrityErrorDuplicateDataSourceNameError
  • KnowledgeGraphService.update() supports partial updates (optional name/description)

Key design decisions

  • Pagination from day 1: offset, limit (default 20, max 100), total, items
  • Credentials are write-only: has_credentials: bool in response, never expose secrets
  • PATCH semantics: optional fields, existing values preserved when None
  • get() returns 404 for both not-found and access-denied (no existence leakage)
  • Generic error messages in responses (no internal IDs leaked)

Test plan

  • 41 new unit tests covering all routes, status codes, error handling, pagination
  • 336 total management unit tests pass, zero regressions
  • Integration tests for KG create/list/get/update/delete
  • Integration tests for DS create (with/without credentials), get, list, update, delete, trigger sync
  • Integration tests for authorization denial (403)
  • Review fixes: 404 for not-found (not 400), defensive UnauthorizedError on get, generic trigger_sync error messages

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Management APIs: full data source and knowledge-graph CRUD + sync under /management.
    • Listing endpoints support pagination and return total/offset/limit metadata.
    • Secure credential storage for data sources (credentials stored and not exposed).
  • Bug Fixes

    • Clear duplicate-name conflict handling for data sources (409 responses).
    • Knowledge-graph updates accept partial changes (optional name/description).
  • Tests

    • Extensive new unit and integration tests covering APIs, routes, repositories, and pagination.

jsell-rh and others added 5 commits March 18, 2026 10:27
…ate/update

The create() and update() methods in DataSourceService did not catch
IntegrityError for duplicate data source names within a knowledge graph.
This adds the same pattern used in KnowledgeGraphService.create() to
raise DuplicateDataSourceNameError when the uq_data_sources_kg_name
constraint is violated.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…date()

Change update() signature to accept optional name and description
parameters. When None is passed, the existing values are preserved.
This enables PATCH semantics in the upcoming route handlers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…-185)

Add FastAPI routes for Knowledge Graph and Data Source CRUD operations:

Knowledge Graph routes:
- POST /management/workspaces/{workspace_id}/knowledge-graphs
- GET /management/workspaces/{workspace_id}/knowledge-graphs (paginated)
- GET /management/knowledge-graphs/{kg_id}
- PATCH /management/knowledge-graphs/{kg_id}
- DELETE /management/knowledge-graphs/{kg_id}

Data Source routes:
- POST /management/knowledge-graphs/{kg_id}/data-sources
- GET /management/knowledge-graphs/{kg_id}/data-sources (paginated)
- GET /management/data-sources/{ds_id}
- PATCH /management/data-sources/{ds_id}
- DELETE /management/data-sources/{ds_id}
- POST /management/data-sources/{ds_id}/sync (202 Accepted)

Includes Pydantic request/response models, DI wiring, router
registration in main.py, and 41 unit tests covering happy paths,
error handling (403/404/409/422), and pagination.

Architecture test updated to exclude management.presentation from
IAM import restriction (legitimate auth dependency at the boundary).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add integration tests that exercise the full stack: HTTP request ->
route -> DI -> service -> SpiceDB + PostgreSQL -> response.

Tests cover:
- KG creation, listing with pagination, get, update, delete
- DS creation with/without credentials, sync trigger (202)
- Authorization enforcement (403 for unauthorized users)
- Duplicate name detection (409)
- Not found handling (404)

Includes shared conftest with cleanup fixtures, SpiceDB permission
helpers, and encryption key setup.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… and expand integration tests

- Return 404 (not 400) for not-found resources in update routes
- Add defensive UnauthorizedError handler on get routes
- Use generic error messages in trigger_sync to prevent information leakage
- Add integration tests for DS get, list, update, and delete endpoints

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 18, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 232ee5ca-2af3-412d-8b2e-eb63d6937bc8

📥 Commits

Reviewing files that changed from the base of the PR and between 5b7d0a6 and 800e749.

⛔ Files ignored due to path filters (1)
  • src/api/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (2)
  • src/api/management/application/services/knowledge_graph_service.py
  • src/api/tests/integration/management/conftest.py

Walkthrough

This PR introduces a Management bounded context and wires its router into the main FastAPI app. It adds presentation layers (APIRouters and Pydantic models) for knowledge graphs and data sources, implements paginated repository/service APIs returning (items, total), adds tenant-aware credential write-through to a secret store with duplicate-name handling, updates service/repository signatures, and adds extensive unit and integration tests plus updated test fixtures for authorization and lifecycle management.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant RouteHandler as "Route Handler"
    participant Auth as "Authorization"
    participant Service as "Service Layer"
    participant SecretStore as "Secret Store"
    participant DB as "Database"

    Client->>RouteHandler: POST /management/knowledge-graphs/{kg_id}/data-sources
    RouteHandler->>Auth: verify permissions
    Auth-->>RouteHandler: allowed
    RouteHandler->>Service: create(kg_id, request, user_id)
    Service->>DB: insert data_source (begin tx)
    alt IntegrityError: duplicate name
        DB-->>Service: IntegrityError (uq_data_sources_kg_name)
        Service->>Service: emit probe (data_source_creation_failed)
        Service-->>RouteHandler: DuplicateDataSourceNameError
        RouteHandler-->>Client: 409 Conflict
    else success
        Service->>SecretStore: store credentials (tenant_id, raw_credentials)
        SecretStore-->>Service: credentials_path
        Service->>DB: update data_source.credentials_path (within tx) and commit
        DB-->>Service: commit success
        Service->>Service: emit probe (data_source_created)
        Service-->>RouteHandler: DataSourceResponse
        RouteHandler-->>Client: 201 Created
    end
Loading
sequenceDiagram
    participant Client
    participant RouteHandler as "Route Handler"
    participant Auth as "Authorization"
    participant Service as "Service Layer"
    participant DB as "Database"

    Client->>RouteHandler: GET /management/workspaces/{workspace_id}/knowledge-graphs?offset=&limit=
    RouteHandler->>Auth: verify permissions
    Auth-->>RouteHandler: allowed
    RouteHandler->>Service: list_for_workspace(user_id, workspace_id, offset, limit)
    Service->>DB: query paginated KGs + count total
    DB-->>Service: (items_page, total)
    Service->>Service: emit probe (knowledge_graphs_listed)
    Service-->>RouteHandler: (items_page, total)
    RouteHandler-->>Client: 200 OK (items, total, offset, limit)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: adding FastAPI routes and dependency injection to the management context.
Docstring Coverage ✅ Passed Docstring coverage is 94.44% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/AIHCM-185
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

jsell-rh and others added 2 commits March 18, 2026 10:49
…nation

- Add generic except Exception catch-all to all route handlers matching
  IAM pattern, preventing unhandled exceptions from leaking stack traces
- Move pagination from application-layer slicing to database-level
  LIMIT/OFFSET queries in repositories
- Repository find methods now return (items, total_count) tuples
- Service list methods pass through pagination parameters
- Routes pass offset/limit query params directly to services

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… transaction handling

- Update repository integration tests to wrap all raw SQL operations
  in async with session.begin() blocks instead of bare execute+commit
- Fix transaction-already-begun errors by ensuring every database
  operation uses explicit transaction boundaries
- Fix conftest clean_management_data fixture to use begin() context
  manager

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (6)
src/api/tests/unit/management/test_architecture.py (1)

230-240: Keep the IAM carve-out narrower than the whole presentation package.

The current need is just current-user extraction in the new route modules, but excluding management.presentation* now lets any future presentation code import arbitrary IAM modules without this test failing. Prefer a narrower carve-out or a shared auth dependency so the boundary still protects the rest of the package.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/tests/unit/management/test_architecture.py` around lines 230 - 240,
The test currently broadens the IAM carve-out by excluding
"management.presentation*" which allows any future presentation code to import
IAM; narrow the carve-out in the archrule("management_no_iam") rule so only the
specific modules that perform CurrentUser extraction or shared auth dependency
are excluded (e.g., replace exclude("management.presentation*") with an exclude
for the exact auth/current-user modules such as
"management.presentation.current_user" or
"management.presentation.routes.current_user" or the shared auth dependency
module name), keeping the match("management*") and should_not_import("iam*")
checks unchanged.
src/api/tests/integration/management/test_knowledge_graph_api.py (1)

18-27: Consider extracting _get_root_workspace_id to conftest.py to avoid duplication.

This helper function is duplicated in test_data_source_api.py. Moving it to conftest.py would reduce duplication and provide a single source of truth.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/tests/integration/management/test_knowledge_graph_api.py` around
lines 18 - 27, Extract the duplicated helper
_get_root_workspace_id(async_client, tenant_auth_headers) into conftest.py as a
shared pytest async helper, then remove the duplicate in
test_knowledge_graph_api.py and test_data_source_api.py and import/use the
fixture/function from conftest.py; ensure the signature and return type remain
the same and update any imports or test usages to call the shared
_get_root_workspace_id helper (or register it as a fixture) so both tests
reference the single implementation.
src/api/management/presentation/data_sources/models.py (1)

119-155: Consider including error field in SyncRunResponse for failed sync visibility.

The domain DataSourceSyncRun entity has an error: str | None field, but SyncRunResponse omits it. For sync runs with status="failed", clients may need the error message for debugging or display purposes.

♻️ Suggested addition
 class SyncRunResponse(BaseModel):
     """Response containing sync run details.

     Attributes:
         id: Sync run ID
         data_source_id: Data source this sync belongs to
         status: Sync run status (pending, running, completed, failed)
         started_at: Sync start timestamp
         completed_at: Sync completion timestamp (None if not complete)
+        error: Error message if sync failed (None otherwise)
         created_at: Record creation timestamp
     """

     id: str
     data_source_id: str
     status: str
     started_at: datetime
     completed_at: datetime | None
+    error: str | None
     created_at: datetime

     `@classmethod`
     def from_domain(cls, sync_run: DataSourceSyncRun) -> SyncRunResponse:
         ...
         return cls(
             id=sync_run.id,
             data_source_id=sync_run.data_source_id,
             status=sync_run.status,
             started_at=sync_run.started_at,
             completed_at=sync_run.completed_at,
+            error=sync_run.error,
             created_at=sync_run.created_at,
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/presentation/data_sources/models.py` around lines 119 -
155, Add the missing error field to the API response: update the SyncRunResponse
model to include an error: str | None attribute and populate it in the
from_domain classmethod from the DataSourceSyncRun.error value so failed syncs
expose their error message; specifically modify the SyncRunResponse class
definition and its from_domain method to accept and set error using
sync_run.error.
src/api/management/presentation/knowledge_graphs/routes.py (2)

157-167: String-based error detection is fragile.

Checking "not found" in error_msg to distinguish 404 from 400 is brittle—it depends on the exact wording of the exception message. Consider using a dedicated exception type (e.g., KnowledgeGraphNotFoundError) for clearer semantics.

♻️ Suggested approach
# In ports/exceptions.py or domain/exceptions.py
class KnowledgeGraphNotFoundError(Exception):
    """Raised when a knowledge graph is not found."""
    pass

# In route handler
except KnowledgeGraphNotFoundError:
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="Knowledge graph not found",
    )
except (InvalidKnowledgeGraphNameError, ValueError) as e:
    raise HTTPException(
        status_code=status.HTTP_400_BAD_REQUEST,
        detail=str(e),
    )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/presentation/knowledge_graphs/routes.py` around lines 157
- 167, Replace the fragile string check on the exception message by introducing
a dedicated exception type (e.g., KnowledgeGraphNotFoundError) and handling it
explicitly: add KnowledgeGraphNotFoundError to your exceptions module
(ports/exceptions.py or domain/exceptions.py) and then change the route's except
clauses to catch KnowledgeGraphNotFoundError and raise
HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Knowledge graph not
found") while leaving InvalidKnowledgeGraphNameError and ValueError to be caught
separately and mapped to 400 with their message; update any code that currently
raises the generic "not found" ValueError to raise KnowledgeGraphNotFoundError
instead.

81-93: In-memory pagination may cause performance issues with large datasets.

The current implementation fetches all knowledge graphs from the service and then slices in Python. For workspaces with many KGs, this could cause memory and latency issues.

Consider passing offset and limit to the service layer to enable database-level pagination.

♻️ Suggested improvement
-    all_kgs = await service.list_for_workspace(
-        user_id=current_user.user_id.value,
-        workspace_id=workspace_id,
-    )
-    total = len(all_kgs)
-    paginated = all_kgs[offset : offset + limit]
+    kgs, total = await service.list_for_workspace(
+        user_id=current_user.user_id.value,
+        workspace_id=workspace_id,
+        offset=offset,
+        limit=limit,
+    )
     return KnowledgeGraphListResponse(
-        items=[KnowledgeGraphResponse.from_domain(kg) for kg in paginated],
+        items=[KnowledgeGraphResponse.from_domain(kg) for kg in kgs],
         total=total,
         offset=offset,
         limit=limit,
     )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/presentation/knowledge_graphs/routes.py` around lines 81 -
93, The handler currently does in-memory pagination by calling
service.list_for_workspace(...) to fetch all_kgs then slicing with offset/limit;
modify service.list_for_workspace to accept offset and limit (e.g.,
list_for_workspace(user_id, workspace_id, offset, limit)) and return both the
paginated items and total count (or a paginated result object), then update this
route to call service.list_for_workspace(user_id=current_user.user_id.value,
workspace_id=workspace_id, offset=offset, limit=limit), use the returned items
to build
KnowledgeGraphListResponse(items=[KnowledgeGraphResponse.from_domain(kg) for kg
in items], total=total, offset=offset, limit=limit), and remove the Python
slicing of all_kgs to enable DB-level pagination.
src/api/tests/integration/management/conftest.py (1)

100-119: Consider logging cleanup failures for test debugging.

The cleanup function catches all exceptions and re-raises after rollback, but doesn't log what failed. This could make debugging flaky tests harder.

♻️ Optional: Add logging for cleanup failures
+import logging
+
+logger = logging.getLogger(__name__)
+
     async def cleanup() -> None:
         try:
             await mgmt_async_session.execute(text("DELETE FROM data_source_sync_runs"))
             ...
         except Exception:
+            logger.exception("Failed to clean management data")
             await mgmt_async_session.rollback()
             raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/tests/integration/management/conftest.py` around lines 100 - 119, The
cleanup() function swallows exception details—add logging so failures are
visible: create or use a module logger (e.g., logger =
logging.getLogger(__name__)) and, inside the except Exception block of
cleanup(), call logger.exception(...) with a descriptive message (e.g., "cleanup
failed during management DB teardown") before calling await
mgmt_async_session.rollback() and re-raising; reference the cleanup() function
and the mgmt_async_session.execute calls / outbox delete to give context in the
log message.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/api/management/application/services/data_source_service.py`:
- Around line 327-345: The code directly mutates ds.credentials_path after
storing secrets which bypasses the aggregate update logic and event/timestamp
handling; instead call the aggregate's update method
(DataSource.update_connection / ds.update_connection) to set credentials_path
and updated_by so the DataSourceUpdated event and timestamps are emitted.
Concretely, after awaiting self._secret_store.store(...) remove the direct
assignment to ds.credentials_path and invoke ds.update_connection(...) with
credentials_path=cred_path (and preserved name/connection_config values when not
changing) so all credential-only PATCHes go through the aggregate API.
- Around line 165-175: The secret is being stored via _secret_store.store(...)
before the database save/commit (_ds_repo.save(ds)) completes, which can create
orphaned secrets and prolong transactions; change the flow so you
persist/flush/commit the datasource first (call _ds_repo.save(ds) and ensure the
transaction is committed or flushed) and only then call _secret_store.store(...)
and set ds.credentials_path, minimizing time inside the DB transaction;
additionally, add compensation logic around the secret write (e.g., if any
subsequent operation after storing the secret fails, call
_secret_store.delete(...) to remove the orphaned secret) and ensure error
handling around the try/except block (including the IntegrityError handler) is
updated to account for the new ordering.

In `@src/api/management/presentation/data_sources/routes.py`:
- Around line 91-102: The route is loading all data sources then slicing
(offset/limit) which causes memory/latency issues; update the handler to push
pagination into the service/repository layer by changing the call to
service.list_for_knowledge_graph to accept offset and limit (e.g.
list_for_knowledge_graph(user_id=..., kg_id=..., offset=offset, limit=limit))
and make that service return a paginated result (items + total) instead of a
full list, then construct DataSourceListResponse from that returned items and
total (remove the local len() and slicing/paginated logic and stop using
paginated = all_ds[offset:offset+limit]); also update the underlying repository
methods to perform the database-level pagination and counting so
DataSourceResponse.from_domain is fed only the page of items.
- Around line 70-74: The router currently catches generic ValueError and returns
str(e); replace this with typed service exceptions (e.g.,
DataSourceNotFoundError and DataSourceValidationError) so the route can map them
to 404 and 400 respectively instead of echoing/parsing error messages. Update
the service (see data_source_service.py around the 149-152 area) to raise these
specific exceptions (and remove KG/tenant details from their messages), then
change the two ValueError handlers in routes.py (both the 70-74 block and the
similar 167-177 block) to catch the new exceptions and raise HTTPException with
the proper status_code and a safe, minimal detail message. Ensure existing
Duplicate* and Unauthorized* exception handling remains unchanged.

In `@src/api/management/presentation/knowledge_graphs/routes.py`:
- Around line 197-203: The catch-all "except Exception:" in the delete knowledge
graph route masks underlying errors; remove that broad except so unexpected
exceptions propagate to FastAPI's default handler, or if you must keep a
catch-all, capture the exception as "except Exception as e:" and call the module
logger with logger.exception("Failed to delete knowledge graph") then re-raise
HTTPException providing contextual detail (e.g., detail=str(e) or a sanitized
message) so the original error is logged; modify the block around HTTPException
to either delete the generic except or replace it with the logged-and-re-raised
variant involving HTTPException, "e", and logger.exception.

---

Nitpick comments:
In `@src/api/management/presentation/data_sources/models.py`:
- Around line 119-155: Add the missing error field to the API response: update
the SyncRunResponse model to include an error: str | None attribute and populate
it in the from_domain classmethod from the DataSourceSyncRun.error value so
failed syncs expose their error message; specifically modify the SyncRunResponse
class definition and its from_domain method to accept and set error using
sync_run.error.

In `@src/api/management/presentation/knowledge_graphs/routes.py`:
- Around line 157-167: Replace the fragile string check on the exception message
by introducing a dedicated exception type (e.g., KnowledgeGraphNotFoundError)
and handling it explicitly: add KnowledgeGraphNotFoundError to your exceptions
module (ports/exceptions.py or domain/exceptions.py) and then change the route's
except clauses to catch KnowledgeGraphNotFoundError and raise
HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Knowledge graph not
found") while leaving InvalidKnowledgeGraphNameError and ValueError to be caught
separately and mapped to 400 with their message; update any code that currently
raises the generic "not found" ValueError to raise KnowledgeGraphNotFoundError
instead.
- Around line 81-93: The handler currently does in-memory pagination by calling
service.list_for_workspace(...) to fetch all_kgs then slicing with offset/limit;
modify service.list_for_workspace to accept offset and limit (e.g.,
list_for_workspace(user_id, workspace_id, offset, limit)) and return both the
paginated items and total count (or a paginated result object), then update this
route to call service.list_for_workspace(user_id=current_user.user_id.value,
workspace_id=workspace_id, offset=offset, limit=limit), use the returned items
to build
KnowledgeGraphListResponse(items=[KnowledgeGraphResponse.from_domain(kg) for kg
in items], total=total, offset=offset, limit=limit), and remove the Python
slicing of all_kgs to enable DB-level pagination.

In `@src/api/tests/integration/management/conftest.py`:
- Around line 100-119: The cleanup() function swallows exception details—add
logging so failures are visible: create or use a module logger (e.g., logger =
logging.getLogger(__name__)) and, inside the except Exception block of
cleanup(), call logger.exception(...) with a descriptive message (e.g., "cleanup
failed during management DB teardown") before calling await
mgmt_async_session.rollback() and re-raising; reference the cleanup() function
and the mgmt_async_session.execute calls / outbox delete to give context in the
log message.

In `@src/api/tests/integration/management/test_knowledge_graph_api.py`:
- Around line 18-27: Extract the duplicated helper
_get_root_workspace_id(async_client, tenant_auth_headers) into conftest.py as a
shared pytest async helper, then remove the duplicate in
test_knowledge_graph_api.py and test_data_source_api.py and import/use the
fixture/function from conftest.py; ensure the signature and return type remain
the same and update any imports or test usages to call the shared
_get_root_workspace_id helper (or register it as a fixture) so both tests
reference the single implementation.

In `@src/api/tests/unit/management/test_architecture.py`:
- Around line 230-240: The test currently broadens the IAM carve-out by
excluding "management.presentation*" which allows any future presentation code
to import IAM; narrow the carve-out in the archrule("management_no_iam") rule so
only the specific modules that perform CurrentUser extraction or shared auth
dependency are excluded (e.g., replace exclude("management.presentation*") with
an exclude for the exact auth/current-user modules such as
"management.presentation.current_user" or
"management.presentation.routes.current_user" or the shared auth dependency
module name), keeping the match("management*") and should_not_import("iam*")
checks unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d04f52c7-ee50-49de-991a-e0777f004c51

📥 Commits

Reviewing files that changed from the base of the PR and between cafacf8 and cfce711.

📒 Files selected for processing (17)
  • src/api/main.py
  • src/api/management/application/services/data_source_service.py
  • src/api/management/application/services/knowledge_graph_service.py
  • src/api/management/presentation/__init__.py
  • src/api/management/presentation/data_sources/__init__.py
  • src/api/management/presentation/data_sources/models.py
  • src/api/management/presentation/data_sources/routes.py
  • src/api/management/presentation/knowledge_graphs/__init__.py
  • src/api/management/presentation/knowledge_graphs/models.py
  • src/api/management/presentation/knowledge_graphs/routes.py
  • src/api/tests/integration/management/conftest.py
  • src/api/tests/integration/management/test_data_source_api.py
  • src/api/tests/integration/management/test_knowledge_graph_api.py
  • src/api/tests/unit/management/presentation/__init__.py
  • src/api/tests/unit/management/presentation/test_data_source_routes.py
  • src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py
  • src/api/tests/unit/management/test_architecture.py

Comment on lines +165 to +175
if raw_credentials is not None:
cred_path = f"datasource/{ds.id.value}/credentials"
await self._secret_store.store(
path=cred_path,
tenant_id=self._scope_to_tenant,
credentials=raw_credentials,
)
ds.credentials_path = cred_path

await self._ds_repo.save(ds)
except IntegrityError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Avoid writing secrets before the database write is known to succeed.

At Line 167 and Line 339, _secret_store.store() runs before the database work has been flushed/committed. If save() or commit then fails, create leaves an orphaned secret and update can rotate credentials even though the request returns an error. It also keeps the transaction open across external I/O. Flush first, then write the secret, and add compensation if anything after the secret write fails.

Also applies to: 337-347

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/application/services/data_source_service.py` around lines
165 - 175, The secret is being stored via _secret_store.store(...) before the
database save/commit (_ds_repo.save(ds)) completes, which can create orphaned
secrets and prolong transactions; change the flow so you persist/flush/commit
the datasource first (call _ds_repo.save(ds) and ensure the transaction is
committed or flushed) and only then call _secret_store.store(...) and set
ds.credentials_path, minimizing time inside the DB transaction; additionally,
add compensation logic around the secret write (e.g., if any subsequent
operation after storing the secret fails, call _secret_store.delete(...) to
remove the orphaned secret) and ensure error handling around the try/except
block (including the IntegrityError handler) is updated to account for the new
ordering.

Comment on lines +70 to +74
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Replace message-based ValueError handling with typed service exceptions.

You already use typed exceptions for duplicate and unauthorized cases; not-found/validation should follow the same pattern. Right now the router either echoes str(e) to the client or parses "not found" out of the message to pick the status code, and src/api/management/application/services/data_source_service.py Lines 149-152 include KG/tenant details in those messages.

Also applies to: 167-177

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/presentation/data_sources/routes.py` around lines 70 - 74,
The router currently catches generic ValueError and returns str(e); replace this
with typed service exceptions (e.g., DataSourceNotFoundError and
DataSourceValidationError) so the route can map them to 404 and 400 respectively
instead of echoing/parsing error messages. Update the service (see
data_source_service.py around the 149-152 area) to raise these specific
exceptions (and remove KG/tenant details from their messages), then change the
two ValueError handlers in routes.py (both the 70-74 block and the similar
167-177 block) to catch the new exceptions and raise HTTPException with the
proper status_code and a safe, minimal detail message. Ensure existing
Duplicate* and Unauthorized* exception handling remains unchanged.

Comment on lines +91 to +102
all_ds = await service.list_for_knowledge_graph(
user_id=current_user.user_id.value,
kg_id=kg_id,
)
total = len(all_ds)
paginated = all_ds[offset : offset + limit]
return DataSourceListResponse(
items=[DataSourceResponse.from_domain(ds) for ds in paginated],
total=total,
offset=offset,
limit=limit,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Push pagination below the route layer.

At Line 91, the handler loads every data source and only then applies offset/limit. That makes latency and memory grow with the full collection size, so this endpoint will degrade as a knowledge graph grows. Have the service/repository return the requested page and total count directly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/presentation/data_sources/routes.py` around lines 91 -
102, The route is loading all data sources then slicing (offset/limit) which
causes memory/latency issues; update the handler to push pagination into the
service/repository layer by changing the call to
service.list_for_knowledge_graph to accept offset and limit (e.g.
list_for_knowledge_graph(user_id=..., kg_id=..., offset=offset, limit=limit))
and make that service return a paginated result (items + total) instead of a
full list, then construct DataSourceListResponse from that returned items and
total (remove the local len() and slicing/paginated logic and stop using
paginated = all_ds[offset:offset+limit]); also update the underlying repository
methods to perform the database-level pagination and counting so
DataSourceResponse.from_domain is fed only the page of items.

Comment on lines +197 to +203
except HTTPException:
raise
except Exception:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to delete knowledge graph",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Catch-all exception handler masks underlying errors.

The broad except Exception clause will catch and hide any unexpected errors as a generic 500 response. This makes debugging difficult and may mask bugs. Consider removing it and letting unexpected exceptions propagate to FastAPI's default error handling, or at least log the exception.

🛡️ Suggested fix
     except HTTPException:
         raise
-    except Exception:
-        raise HTTPException(
-            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
-            detail="Failed to delete knowledge graph",
-        )
+    # Let unexpected exceptions propagate for proper error logging

Or if you need the catch-all:

+    import logging
+    logger = logging.getLogger(__name__)
+
     except Exception:
+        logger.exception("Unexpected error deleting knowledge graph %s", kg_id)
         raise HTTPException(
             status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
             detail="Failed to delete knowledge graph",
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except HTTPException:
raise
except Exception:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to delete knowledge graph",
)
except HTTPException:
raise
# Let unexpected exceptions propagate for proper error logging
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/presentation/knowledge_graphs/routes.py` around lines 197
- 203, The catch-all "except Exception:" in the delete knowledge graph route
masks underlying errors; remove that broad except so unexpected exceptions
propagate to FastAPI's default handler, or if you must keep a catch-all, capture
the exception as "except Exception as e:" and call the module logger with
logger.exception("Failed to delete knowledge graph") then re-raise HTTPException
providing contextual detail (e.g., detail=str(e) or a sanitized message) so the
original error is logged; modify the block around HTTPException to either delete
the generic except or replace it with the logged-and-re-raised variant involving
HTTPException, "e", and logger.exception.

…error to SyncRunResponse

- Route credential-only updates through ds.update_connection() instead of
  directly mutating ds.credentials_path, ensuring DataSourceUpdated event
  emission and updated_at timestamp update
- Add error field to SyncRunResponse so failed syncs expose their error message

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
src/api/management/presentation/data_sources/routes.py (1)

70-74: ⚠️ Potential issue | 🟠 Major

Stop using ValueError message parsing/echoing for HTTP mapping.

These handlers currently depend on str(e) (including "not found" string matching), which is brittle and can expose internal service details to clients. Please switch to typed service exceptions (not found vs validation) and map them explicitly to 404/400.

As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."

Also applies to: 188-198, 267-271

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/presentation/data_sources/routes.py` around lines 70 - 74,
Current handlers catch ValueError and echo str(e) which is brittle and leaks
internals; replace this by introducing typed service exceptions (e.g.,
ServiceNotFoundError and ServiceValidationError) in the service layer and raise
those instead of ValueError, then update the route handlers in routes.py to
catch ServiceNotFoundError -> raise
HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="resource not
found") and catch ServiceValidationError -> raise
HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e.validation_message) or a sanitized message); remove any logic that
parses "not found" from str(e) and ensure the route handlers reference the new
exception classes instead of ValueError.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/api/management/application/services/knowledge_graph_service.py`:
- Around line 258-268: The current code fetches all KG IDs then does N+1 reads
and applies offset/limit after loading (variables kg_ids, kgs, and method
_kg_repo.get_by_id), causing poor scalability; change the flow to apply
pagination at the repo level (do not load all KGs then slice). Update or use a
repository method that returns paginated KGs filtered by tenant (e.g., add or
call _kg_repo.list_for_workspace or _kg_repo.list_by_workspace with offset and
limit and tenant_id parameters) so you fetch only the requested page and avoid
per-id get_by_id loops; adjust callers to expect total count and the paginated
list returned from the repo.
- Around line 390-395: The current deletion uses a single fetch with limit=10000
(in knowledge_graph_service.py) which can leave data sources undeleted; change
the logic around self._ds_repo.find_by_knowledge_graph and deletion so you page
through results until none remain: repeatedly call find_by_knowledge_graph
(using a reasonable page_size/limit) and for each returned batch call
ds.mark_for_deletion(...) and await self._ds_repo.delete(ds) for each item,
looping (incrementing offset or using the repo’s cursor) until the fetched batch
is empty; ensure this loop runs before deleting the KG to avoid integrity
errors.

In `@src/api/management/presentation/knowledge_graphs/routes.py`:
- Around line 61-65: Replace the broad except (InvalidKnowledgeGraphNameError,
ValueError) blocks that inspect and echo ValueError text with explicit typed
exceptions from the KnowledgeGraphService (e.g., KnowledgeGraphNotFoundError,
KnowledgeGraphValidationError) and map each to a fixed, safe HTTPException
message; update the handlers in routes.py (including the other occurrences
around the 178–188 area) to catch those concrete exceptions instead of
ValueError and return appropriate status codes (400 for validation, 404 for
not-found) with non-sensitive, constant detail strings rather than str(e).

In `@src/api/tests/integration/management/conftest.py`:
- Around line 111-113: The DELETE statement that cleans the outbox only targets
aggregate_type values ('KnowledgeGraph','DataSource') but management events here
use lowercase names, so update the cleanup in conftest.py to match
case-insensitively (e.g., use LOWER(aggregate_type) IN
('knowledge_graph','data_source') or add both cased variants) to ensure rows for
the outbox table are removed; locate the DELETE FROM outbox WHERE aggregate_type
... statement in conftest.py and change it accordingly.

---

Duplicate comments:
In `@src/api/management/presentation/data_sources/routes.py`:
- Around line 70-74: Current handlers catch ValueError and echo str(e) which is
brittle and leaks internals; replace this by introducing typed service
exceptions (e.g., ServiceNotFoundError and ServiceValidationError) in the
service layer and raise those instead of ValueError, then update the route
handlers in routes.py to catch ServiceNotFoundError -> raise
HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="resource not
found") and catch ServiceValidationError -> raise
HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e.validation_message) or a sanitized message); remove any logic that
parses "not found" from str(e) and ensure the route handlers reference the new
exception classes instead of ValueError.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 64438ed4-e25d-4eb9-8a3a-9d339ea59a43

📥 Commits

Reviewing files that changed from the base of the PR and between cfce711 and 634108b.

📒 Files selected for processing (14)
  • src/api/management/application/services/data_source_service.py
  • src/api/management/application/services/knowledge_graph_service.py
  • src/api/management/infrastructure/repositories/data_source_repository.py
  • src/api/management/infrastructure/repositories/knowledge_graph_repository.py
  • src/api/management/ports/repositories.py
  • src/api/management/presentation/data_sources/routes.py
  • src/api/management/presentation/knowledge_graphs/routes.py
  • src/api/tests/integration/management/conftest.py
  • src/api/tests/integration/management/test_data_source_repository.py
  • src/api/tests/integration/management/test_knowledge_graph_repository.py
  • src/api/tests/unit/management/application/test_data_source_service.py
  • src/api/tests/unit/management/application/test_knowledge_graph_service.py
  • src/api/tests/unit/management/presentation/test_data_source_routes.py
  • src/api/tests/unit/management/presentation/test_knowledge_graph_routes.py

Comment on lines 390 to 395
data_sources, _ = await self._ds_repo.find_by_knowledge_graph(
kg_id, offset=0, limit=10000
)
for ds in data_sources:
ds.mark_for_deletion(deleted_by=user_id)
await self._ds_repo.delete(ds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Cascade delete can miss records because of the fixed limit=10000.

If a KG has more than 10,000 data sources, this path won’t delete all children before deleting the KG. That can produce integrity failures and false 500s.

Proposed fix (delete in pages until empty)
-                data_sources, _ = await self._ds_repo.find_by_knowledge_graph(
-                    kg_id, offset=0, limit=10000
-                )
-                for ds in data_sources:
-                    ds.mark_for_deletion(deleted_by=user_id)
-                    await self._ds_repo.delete(ds)
+                page_size = 500
+                while True:
+                    data_sources, _ = await self._ds_repo.find_by_knowledge_graph(
+                        kg_id, offset=0, limit=page_size
+                    )
+                    if not data_sources:
+                        break
+                    for ds in data_sources:
+                        ds.mark_for_deletion(deleted_by=user_id)
+                        await self._ds_repo.delete(ds)

As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
data_sources, _ = await self._ds_repo.find_by_knowledge_graph(
kg_id, offset=0, limit=10000
)
for ds in data_sources:
ds.mark_for_deletion(deleted_by=user_id)
await self._ds_repo.delete(ds)
page_size = 500
while True:
data_sources, _ = await self._ds_repo.find_by_knowledge_graph(
kg_id, offset=0, limit=page_size
)
if not data_sources:
break
for ds in data_sources:
ds.mark_for_deletion(deleted_by=user_id)
await self._ds_repo.delete(ds)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/application/services/knowledge_graph_service.py` around
lines 390 - 395, The current deletion uses a single fetch with limit=10000 (in
knowledge_graph_service.py) which can leave data sources undeleted; change the
logic around self._ds_repo.find_by_knowledge_graph and deletion so you page
through results until none remain: repeatedly call find_by_knowledge_graph
(using a reasonable page_size/limit) and for each returned batch call
ds.mark_for_deletion(...) and await self._ds_repo.delete(ds) for each item,
looping (incrementing offset or using the repo’s cursor) until the fetched batch
is empty; ensure this loop runs before deleting the KG to avoid integrity
errors.

Comment on lines +61 to +65
except (InvalidKnowledgeGraphNameError, ValueError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid ValueError string inspection/echo in route error handling.

Using str(e) for client details and "not found" text matching makes status mapping fragile and leaks service internals. Prefer explicit typed exceptions from KnowledgeGraphService (e.g., not-found vs validation) and fixed safe response messages.

As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."

Also applies to: 178-188

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/presentation/knowledge_graphs/routes.py` around lines 61 -
65, Replace the broad except (InvalidKnowledgeGraphNameError, ValueError) blocks
that inspect and echo ValueError text with explicit typed exceptions from the
KnowledgeGraphService (e.g., KnowledgeGraphNotFoundError,
KnowledgeGraphValidationError) and map each to a fixed, safe HTTPException
message; update the handlers in routes.py (including the other occurrences
around the 178–188 area) to catch those concrete exceptions instead of
ValueError and return appropriate status codes (400 for validation, 404 for
not-found) with non-sensitive, constant detail strings rather than str(e).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
src/api/management/application/services/data_source_service.py (1)

165-174: ⚠️ Potential issue | 🔴 Critical

Secret writes still happen before the database outcome is known.

_secret_store.store() runs inside the open transaction and before the data-source write/commit has succeeded. If save()/flush/commit then fails, create/update can leave an orphaned or prematurely rotated secret, and the DB transaction stays open across external I/O. Persist/flush first, then write the secret, and add compensation if anything after the secret write fails.

Also applies to: 344-359

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/application/services/data_source_service.py` around lines
165 - 174, The secret is being written via _secret_store.store() before the DB
transaction completes (call to self._ds_repo.save(ds)), which can leave
orphaned/rotated secrets or keep the transaction open; change the flow in the
create/update path so you persist/flush/commit the datasource first (call
self._ds_repo.save(ds) and ensure the unit-of-work/transaction is committed or
flushed) and only after successful DB commit perform
self._secret_store.store(path=cred_path, tenant_id=self._scope_to_tenant,
credentials=raw_credentials) and set ds.credentials_path accordingly; if the
secret write fails, add compensation logic to remove the newly committed
credentials_path (or attempt a DB-side rollback/update to clear
credentials_path) so the DB and secret store remain consistent (apply the same
fix for the other occurrence around lines 344-359).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/api/management/application/services/data_source_service.py`:
- Around line 334-357: The code currently calls ds.update_connection() twice
(before and after storing secrets), causing duplicate DataSourceUpdated events;
instead compute the final name, connection_config, and credentials_path first,
then: if raw_credentials is provided, await self._secret_store.store(... ) to
write secrets and set credentials_path to
f"datasource/{ds.id.value}/credentials", then call ds.update_connection(...)
exactly once with the final values (name, connection_config, credentials_path,
updated_by=user_id); if raw_credentials is not provided but
name/connection_config changed, call ds.update_connection(...) once with the
computed final values; reference ds.update_connection,
DataSource.update_connection, _secret_store.store, raw_credentials, and
credentials_path when making this change.

In `@src/api/management/presentation/data_sources/models.py`:
- Around line 136-156: The from_domain classmethod on SyncRunResponse currently
forwards DataSourceSyncRun.error verbatim into the API response, which can leak
internal details; change SyncRunResponse.from_domain to not include raw
sync_run.error directly and instead set the response error to a sanitized
client-facing value (e.g., a short reason string or enum code) by calling a new
helper (e.g., sanitize_sync_error or map_sync_error_to_code) or returning None
when appropriate, and ensure the original sync_run.error is preserved only in
logs/observability (log or attach to internal diagnostics) rather than returned
to clients.

---

Duplicate comments:
In `@src/api/management/application/services/data_source_service.py`:
- Around line 165-174: The secret is being written via _secret_store.store()
before the DB transaction completes (call to self._ds_repo.save(ds)), which can
leave orphaned/rotated secrets or keep the transaction open; change the flow in
the create/update path so you persist/flush/commit the datasource first (call
self._ds_repo.save(ds) and ensure the unit-of-work/transaction is committed or
flushed) and only after successful DB commit perform
self._secret_store.store(path=cred_path, tenant_id=self._scope_to_tenant,
credentials=raw_credentials) and set ds.credentials_path accordingly; if the
secret write fails, add compensation logic to remove the newly committed
credentials_path (or attempt a DB-side rollback/update to clear
credentials_path) so the DB and secret store remain consistent (apply the same
fix for the other occurrence around lines 344-359).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 7e9a1681-f3bb-455d-8a8a-744c110a36e2

📥 Commits

Reviewing files that changed from the base of the PR and between 634108b and 5b7d0a6.

📒 Files selected for processing (2)
  • src/api/management/application/services/data_source_service.py
  • src/api/management/presentation/data_sources/models.py

Comment on lines +334 to +357
if name is not None or connection_config is not None:
ds.update_connection(
name=name if name is not None else ds.name,
connection_config=connection_config
if connection_config is not None
else ds.connection_config,
credentials_path=ds.credentials_path,
updated_by=user_id,
)

if raw_credentials is not None:
cred_path = f"datasource/{ds.id.value}/credentials"
await self._secret_store.store(
path=cred_path,
tenant_id=self._scope_to_tenant,
credentials=raw_credentials,
)
# Update via aggregate method to emit event and update timestamps
ds.update_connection(
name=ds.name,
connection_config=ds.connection_config,
credentials_path=cred_path,
updated_by=user_id,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

A combined PATCH can emit two DataSourceUpdated events.

When a request changes name/config and credentials together, this method calls ds.update_connection() once before the secret write and again after it. In src/api/management/domain/aggregates/data_source.py, DataSource.update_connection() appends a DataSourceUpdated event on every call, so one API request produces duplicate update events and timestamp churn. Compute the final values first and invoke the aggregate once.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/application/services/data_source_service.py` around lines
334 - 357, The code currently calls ds.update_connection() twice (before and
after storing secrets), causing duplicate DataSourceUpdated events; instead
compute the final name, connection_config, and credentials_path first, then: if
raw_credentials is provided, await self._secret_store.store(... ) to write
secrets and set credentials_path to f"datasource/{ds.id.value}/credentials",
then call ds.update_connection(...) exactly once with the final values (name,
connection_config, credentials_path, updated_by=user_id); if raw_credentials is
not provided but name/connection_config changed, call ds.update_connection(...)
once with the computed final values; reference ds.update_connection,
DataSource.update_connection, _secret_store.store, raw_credentials, and
credentials_path when making this change.

Comment on lines +136 to +156
error: str | None
created_at: datetime

@classmethod
def from_domain(cls, sync_run: DataSourceSyncRun) -> SyncRunResponse:
"""Convert domain DataSourceSyncRun entity to API response.

Args:
sync_run: DataSourceSyncRun domain entity

Returns:
SyncRunResponse with sync run details
"""
return cls(
id=sync_run.id,
data_source_id=sync_run.data_source_id,
status=sync_run.status,
started_at=sync_run.started_at,
completed_at=sync_run.completed_at,
error=sync_run.error,
created_at=sync_run.created_at,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid making raw sync failure text part of the public API.

sync_run.error is forwarded verbatim into the response model. If that field carries upstream exception text, failed syncs can leak internal URLs, identifiers, or secrets. Prefer a sanitized client-facing reason/code here and keep the raw error for logs/observability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/presentation/data_sources/models.py` around lines 136 -
156, The from_domain classmethod on SyncRunResponse currently forwards
DataSourceSyncRun.error verbatim into the API response, which can leak internal
details; change SyncRunResponse.from_domain to not include raw sync_run.error
directly and instead set the response error to a sanitized client-facing value
(e.g., a short reason string or enum code) by calling a new helper (e.g.,
sanitize_sync_error or map_sync_error_to_code) or returning None when
appropriate, and ensure the original sync_run.error is preserved only in
logs/observability (log or attach to internal diagnostics) rather than returned
to clients.

…atch

- Replace limit=10000 magic number with batched while-loop (100 per batch)
  for cascade deleting data sources when a knowledge graph is deleted
- Fix outbox cleanup SQL to use snake_case aggregate types matching what
  repositories actually write ('knowledge_graph', 'data_source')

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant