feat(management): add FastAPI routes and dependency injection (AIHCM-185)#303
feat(management): add FastAPI routes and dependency injection (AIHCM-185)#303
Conversation
…ate/update The create() and update() methods in DataSourceService did not catch IntegrityError for duplicate data source names within a knowledge graph. This adds the same pattern used in KnowledgeGraphService.create() to raise DuplicateDataSourceNameError when the uq_data_sources_kg_name constraint is violated. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…date() Change update() signature to accept optional name and description parameters. When None is passed, the existing values are preserved. This enables PATCH semantics in the upcoming route handlers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…-185)
Add FastAPI routes for Knowledge Graph and Data Source CRUD operations:
Knowledge Graph routes:
- POST /management/workspaces/{workspace_id}/knowledge-graphs
- GET /management/workspaces/{workspace_id}/knowledge-graphs (paginated)
- GET /management/knowledge-graphs/{kg_id}
- PATCH /management/knowledge-graphs/{kg_id}
- DELETE /management/knowledge-graphs/{kg_id}
Data Source routes:
- POST /management/knowledge-graphs/{kg_id}/data-sources
- GET /management/knowledge-graphs/{kg_id}/data-sources (paginated)
- GET /management/data-sources/{ds_id}
- PATCH /management/data-sources/{ds_id}
- DELETE /management/data-sources/{ds_id}
- POST /management/data-sources/{ds_id}/sync (202 Accepted)
Includes Pydantic request/response models, DI wiring, router
registration in main.py, and 41 unit tests covering happy paths,
error handling (403/404/409/422), and pagination.
Architecture test updated to exclude management.presentation from
IAM import restriction (legitimate auth dependency at the boundary).
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add integration tests that exercise the full stack: HTTP request -> route -> DI -> service -> SpiceDB + PostgreSQL -> response. Tests cover: - KG creation, listing with pagination, get, update, delete - DS creation with/without credentials, sync trigger (202) - Authorization enforcement (403 for unauthorized users) - Duplicate name detection (409) - Not found handling (404) Includes shared conftest with cleanup fixtures, SpiceDB permission helpers, and encryption key setup. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… and expand integration tests - Return 404 (not 400) for not-found resources in update routes - Add defensive UnauthorizedError handler on get routes - Use generic error messages in trigger_sync to prevent information leakage - Add integration tests for DS get, list, update, and delete endpoints Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (2)
WalkthroughThis PR introduces a Management bounded context and wires its router into the main FastAPI app. It adds presentation layers (APIRouters and Pydantic models) for knowledge graphs and data sources, implements paginated repository/service APIs returning (items, total), adds tenant-aware credential write-through to a secret store with duplicate-name handling, updates service/repository signatures, and adds extensive unit and integration tests plus updated test fixtures for authorization and lifecycle management. Sequence Diagram(s)sequenceDiagram
participant Client
participant RouteHandler as "Route Handler"
participant Auth as "Authorization"
participant Service as "Service Layer"
participant SecretStore as "Secret Store"
participant DB as "Database"
Client->>RouteHandler: POST /management/knowledge-graphs/{kg_id}/data-sources
RouteHandler->>Auth: verify permissions
Auth-->>RouteHandler: allowed
RouteHandler->>Service: create(kg_id, request, user_id)
Service->>DB: insert data_source (begin tx)
alt IntegrityError: duplicate name
DB-->>Service: IntegrityError (uq_data_sources_kg_name)
Service->>Service: emit probe (data_source_creation_failed)
Service-->>RouteHandler: DuplicateDataSourceNameError
RouteHandler-->>Client: 409 Conflict
else success
Service->>SecretStore: store credentials (tenant_id, raw_credentials)
SecretStore-->>Service: credentials_path
Service->>DB: update data_source.credentials_path (within tx) and commit
DB-->>Service: commit success
Service->>Service: emit probe (data_source_created)
Service-->>RouteHandler: DataSourceResponse
RouteHandler-->>Client: 201 Created
end
sequenceDiagram
participant Client
participant RouteHandler as "Route Handler"
participant Auth as "Authorization"
participant Service as "Service Layer"
participant DB as "Database"
Client->>RouteHandler: GET /management/workspaces/{workspace_id}/knowledge-graphs?offset=&limit=
RouteHandler->>Auth: verify permissions
Auth-->>RouteHandler: allowed
RouteHandler->>Service: list_for_workspace(user_id, workspace_id, offset, limit)
Service->>DB: query paginated KGs + count total
DB-->>Service: (items_page, total)
Service->>Service: emit probe (knowledge_graphs_listed)
Service-->>RouteHandler: (items_page, total)
RouteHandler-->>Client: 200 OK (items, total, offset, limit)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
…nation - Add generic except Exception catch-all to all route handlers matching IAM pattern, preventing unhandled exceptions from leaking stack traces - Move pagination from application-layer slicing to database-level LIMIT/OFFSET queries in repositories - Repository find methods now return (items, total_count) tuples - Service list methods pass through pagination parameters - Routes pass offset/limit query params directly to services Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… transaction handling - Update repository integration tests to wrap all raw SQL operations in async with session.begin() blocks instead of bare execute+commit - Fix transaction-already-begun errors by ensuring every database operation uses explicit transaction boundaries - Fix conftest clean_management_data fixture to use begin() context manager Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (6)
src/api/tests/unit/management/test_architecture.py (1)
230-240: Keep the IAM carve-out narrower than the whole presentation package.The current need is just current-user extraction in the new route modules, but excluding
management.presentation*now lets any future presentation code import arbitrary IAM modules without this test failing. Prefer a narrower carve-out or a shared auth dependency so the boundary still protects the rest of the package.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/tests/unit/management/test_architecture.py` around lines 230 - 240, The test currently broadens the IAM carve-out by excluding "management.presentation*" which allows any future presentation code to import IAM; narrow the carve-out in the archrule("management_no_iam") rule so only the specific modules that perform CurrentUser extraction or shared auth dependency are excluded (e.g., replace exclude("management.presentation*") with an exclude for the exact auth/current-user modules such as "management.presentation.current_user" or "management.presentation.routes.current_user" or the shared auth dependency module name), keeping the match("management*") and should_not_import("iam*") checks unchanged.src/api/tests/integration/management/test_knowledge_graph_api.py (1)
18-27: Consider extracting_get_root_workspace_idto conftest.py to avoid duplication.This helper function is duplicated in
test_data_source_api.py. Moving it toconftest.pywould reduce duplication and provide a single source of truth.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/tests/integration/management/test_knowledge_graph_api.py` around lines 18 - 27, Extract the duplicated helper _get_root_workspace_id(async_client, tenant_auth_headers) into conftest.py as a shared pytest async helper, then remove the duplicate in test_knowledge_graph_api.py and test_data_source_api.py and import/use the fixture/function from conftest.py; ensure the signature and return type remain the same and update any imports or test usages to call the shared _get_root_workspace_id helper (or register it as a fixture) so both tests reference the single implementation.src/api/management/presentation/data_sources/models.py (1)
119-155: Consider includingerrorfield in SyncRunResponse for failed sync visibility.The domain
DataSourceSyncRunentity has anerror: str | Nonefield, butSyncRunResponseomits it. For sync runs withstatus="failed", clients may need the error message for debugging or display purposes.♻️ Suggested addition
class SyncRunResponse(BaseModel): """Response containing sync run details. Attributes: id: Sync run ID data_source_id: Data source this sync belongs to status: Sync run status (pending, running, completed, failed) started_at: Sync start timestamp completed_at: Sync completion timestamp (None if not complete) + error: Error message if sync failed (None otherwise) created_at: Record creation timestamp """ id: str data_source_id: str status: str started_at: datetime completed_at: datetime | None + error: str | None created_at: datetime `@classmethod` def from_domain(cls, sync_run: DataSourceSyncRun) -> SyncRunResponse: ... return cls( id=sync_run.id, data_source_id=sync_run.data_source_id, status=sync_run.status, started_at=sync_run.started_at, completed_at=sync_run.completed_at, + error=sync_run.error, created_at=sync_run.created_at, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/management/presentation/data_sources/models.py` around lines 119 - 155, Add the missing error field to the API response: update the SyncRunResponse model to include an error: str | None attribute and populate it in the from_domain classmethod from the DataSourceSyncRun.error value so failed syncs expose their error message; specifically modify the SyncRunResponse class definition and its from_domain method to accept and set error using sync_run.error.src/api/management/presentation/knowledge_graphs/routes.py (2)
157-167: String-based error detection is fragile.Checking
"not found" in error_msgto distinguish 404 from 400 is brittle—it depends on the exact wording of the exception message. Consider using a dedicated exception type (e.g.,KnowledgeGraphNotFoundError) for clearer semantics.♻️ Suggested approach
# In ports/exceptions.py or domain/exceptions.py class KnowledgeGraphNotFoundError(Exception): """Raised when a knowledge graph is not found.""" pass # In route handler except KnowledgeGraphNotFoundError: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Knowledge graph not found", ) except (InvalidKnowledgeGraphNameError, ValueError) as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/management/presentation/knowledge_graphs/routes.py` around lines 157 - 167, Replace the fragile string check on the exception message by introducing a dedicated exception type (e.g., KnowledgeGraphNotFoundError) and handling it explicitly: add KnowledgeGraphNotFoundError to your exceptions module (ports/exceptions.py or domain/exceptions.py) and then change the route's except clauses to catch KnowledgeGraphNotFoundError and raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Knowledge graph not found") while leaving InvalidKnowledgeGraphNameError and ValueError to be caught separately and mapped to 400 with their message; update any code that currently raises the generic "not found" ValueError to raise KnowledgeGraphNotFoundError instead.
81-93: In-memory pagination may cause performance issues with large datasets.The current implementation fetches all knowledge graphs from the service and then slices in Python. For workspaces with many KGs, this could cause memory and latency issues.
Consider passing
offsetandlimitto the service layer to enable database-level pagination.♻️ Suggested improvement
- all_kgs = await service.list_for_workspace( - user_id=current_user.user_id.value, - workspace_id=workspace_id, - ) - total = len(all_kgs) - paginated = all_kgs[offset : offset + limit] + kgs, total = await service.list_for_workspace( + user_id=current_user.user_id.value, + workspace_id=workspace_id, + offset=offset, + limit=limit, + ) return KnowledgeGraphListResponse( - items=[KnowledgeGraphResponse.from_domain(kg) for kg in paginated], + items=[KnowledgeGraphResponse.from_domain(kg) for kg in kgs], total=total, offset=offset, limit=limit, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/management/presentation/knowledge_graphs/routes.py` around lines 81 - 93, The handler currently does in-memory pagination by calling service.list_for_workspace(...) to fetch all_kgs then slicing with offset/limit; modify service.list_for_workspace to accept offset and limit (e.g., list_for_workspace(user_id, workspace_id, offset, limit)) and return both the paginated items and total count (or a paginated result object), then update this route to call service.list_for_workspace(user_id=current_user.user_id.value, workspace_id=workspace_id, offset=offset, limit=limit), use the returned items to build KnowledgeGraphListResponse(items=[KnowledgeGraphResponse.from_domain(kg) for kg in items], total=total, offset=offset, limit=limit), and remove the Python slicing of all_kgs to enable DB-level pagination.src/api/tests/integration/management/conftest.py (1)
100-119: Consider logging cleanup failures for test debugging.The cleanup function catches all exceptions and re-raises after rollback, but doesn't log what failed. This could make debugging flaky tests harder.
♻️ Optional: Add logging for cleanup failures
+import logging + +logger = logging.getLogger(__name__) + async def cleanup() -> None: try: await mgmt_async_session.execute(text("DELETE FROM data_source_sync_runs")) ... except Exception: + logger.exception("Failed to clean management data") await mgmt_async_session.rollback() raise🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/tests/integration/management/conftest.py` around lines 100 - 119, The cleanup() function swallows exception details—add logging so failures are visible: create or use a module logger (e.g., logger = logging.getLogger(__name__)) and, inside the except Exception block of cleanup(), call logger.exception(...) with a descriptive message (e.g., "cleanup failed during management DB teardown") before calling await mgmt_async_session.rollback() and re-raising; reference the cleanup() function and the mgmt_async_session.execute calls / outbox delete to give context in the log message.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/api/management/application/services/data_source_service.py`:
- Around line 327-345: The code directly mutates ds.credentials_path after
storing secrets which bypasses the aggregate update logic and event/timestamp
handling; instead call the aggregate's update method
(DataSource.update_connection / ds.update_connection) to set credentials_path
and updated_by so the DataSourceUpdated event and timestamps are emitted.
Concretely, after awaiting self._secret_store.store(...) remove the direct
assignment to ds.credentials_path and invoke ds.update_connection(...) with
credentials_path=cred_path (and preserved name/connection_config values when not
changing) so all credential-only PATCHes go through the aggregate API.
- Around line 165-175: The secret is being stored via _secret_store.store(...)
before the database save/commit (_ds_repo.save(ds)) completes, which can create
orphaned secrets and prolong transactions; change the flow so you
persist/flush/commit the datasource first (call _ds_repo.save(ds) and ensure the
transaction is committed or flushed) and only then call _secret_store.store(...)
and set ds.credentials_path, minimizing time inside the DB transaction;
additionally, add compensation logic around the secret write (e.g., if any
subsequent operation after storing the secret fails, call
_secret_store.delete(...) to remove the orphaned secret) and ensure error
handling around the try/except block (including the IntegrityError handler) is
updated to account for the new ordering.
In `@src/api/management/presentation/data_sources/routes.py`:
- Around line 91-102: The route is loading all data sources then slicing
(offset/limit) which causes memory/latency issues; update the handler to push
pagination into the service/repository layer by changing the call to
service.list_for_knowledge_graph to accept offset and limit (e.g.
list_for_knowledge_graph(user_id=..., kg_id=..., offset=offset, limit=limit))
and make that service return a paginated result (items + total) instead of a
full list, then construct DataSourceListResponse from that returned items and
total (remove the local len() and slicing/paginated logic and stop using
paginated = all_ds[offset:offset+limit]); also update the underlying repository
methods to perform the database-level pagination and counting so
DataSourceResponse.from_domain is fed only the page of items.
- Around line 70-74: The router currently catches generic ValueError and returns
str(e); replace this with typed service exceptions (e.g.,
DataSourceNotFoundError and DataSourceValidationError) so the route can map them
to 404 and 400 respectively instead of echoing/parsing error messages. Update
the service (see data_source_service.py around the 149-152 area) to raise these
specific exceptions (and remove KG/tenant details from their messages), then
change the two ValueError handlers in routes.py (both the 70-74 block and the
similar 167-177 block) to catch the new exceptions and raise HTTPException with
the proper status_code and a safe, minimal detail message. Ensure existing
Duplicate* and Unauthorized* exception handling remains unchanged.
In `@src/api/management/presentation/knowledge_graphs/routes.py`:
- Around line 197-203: The catch-all "except Exception:" in the delete knowledge
graph route masks underlying errors; remove that broad except so unexpected
exceptions propagate to FastAPI's default handler, or if you must keep a
catch-all, capture the exception as "except Exception as e:" and call the module
logger with logger.exception("Failed to delete knowledge graph") then re-raise
HTTPException providing contextual detail (e.g., detail=str(e) or a sanitized
message) so the original error is logged; modify the block around HTTPException
to either delete the generic except or replace it with the logged-and-re-raised
variant involving HTTPException, "e", and logger.exception.
---
Nitpick comments:
In `@src/api/management/presentation/data_sources/models.py`:
- Around line 119-155: Add the missing error field to the API response: update
the SyncRunResponse model to include an error: str | None attribute and populate
it in the from_domain classmethod from the DataSourceSyncRun.error value so
failed syncs expose their error message; specifically modify the SyncRunResponse
class definition and its from_domain method to accept and set error using
sync_run.error.
In `@src/api/management/presentation/knowledge_graphs/routes.py`:
- Around line 157-167: Replace the fragile string check on the exception message
by introducing a dedicated exception type (e.g., KnowledgeGraphNotFoundError)
and handling it explicitly: add KnowledgeGraphNotFoundError to your exceptions
module (ports/exceptions.py or domain/exceptions.py) and then change the route's
except clauses to catch KnowledgeGraphNotFoundError and raise
HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Knowledge graph not
found") while leaving InvalidKnowledgeGraphNameError and ValueError to be caught
separately and mapped to 400 with their message; update any code that currently
raises the generic "not found" ValueError to raise KnowledgeGraphNotFoundError
instead.
- Around line 81-93: The handler currently does in-memory pagination by calling
service.list_for_workspace(...) to fetch all_kgs then slicing with offset/limit;
modify service.list_for_workspace to accept offset and limit (e.g.,
list_for_workspace(user_id, workspace_id, offset, limit)) and return both the
paginated items and total count (or a paginated result object), then update this
route to call service.list_for_workspace(user_id=current_user.user_id.value,
workspace_id=workspace_id, offset=offset, limit=limit), use the returned items
to build
KnowledgeGraphListResponse(items=[KnowledgeGraphResponse.from_domain(kg) for kg
in items], total=total, offset=offset, limit=limit), and remove the Python
slicing of all_kgs to enable DB-level pagination.
In `@src/api/tests/integration/management/conftest.py`:
- Around line 100-119: The cleanup() function swallows exception details—add
logging so failures are visible: create or use a module logger (e.g., logger =
logging.getLogger(__name__)) and, inside the except Exception block of
cleanup(), call logger.exception(...) with a descriptive message (e.g., "cleanup
failed during management DB teardown") before calling await
mgmt_async_session.rollback() and re-raising; reference the cleanup() function
and the mgmt_async_session.execute calls / outbox delete to give context in the
log message.
In `@src/api/tests/integration/management/test_knowledge_graph_api.py`:
- Around line 18-27: Extract the duplicated helper
_get_root_workspace_id(async_client, tenant_auth_headers) into conftest.py as a
shared pytest async helper, then remove the duplicate in
test_knowledge_graph_api.py and test_data_source_api.py and import/use the
fixture/function from conftest.py; ensure the signature and return type remain
the same and update any imports or test usages to call the shared
_get_root_workspace_id helper (or register it as a fixture) so both tests
reference the single implementation.
In `@src/api/tests/unit/management/test_architecture.py`:
- Around line 230-240: The test currently broadens the IAM carve-out by
excluding "management.presentation*" which allows any future presentation code
to import IAM; narrow the carve-out in the archrule("management_no_iam") rule so
only the specific modules that perform CurrentUser extraction or shared auth
dependency are excluded (e.g., replace exclude("management.presentation*") with
an exclude for the exact auth/current-user modules such as
"management.presentation.current_user" or
"management.presentation.routes.current_user" or the shared auth dependency
module name), keeping the match("management*") and should_not_import("iam*")
checks unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d04f52c7-ee50-49de-991a-e0777f004c51
📒 Files selected for processing (17)
src/api/main.pysrc/api/management/application/services/data_source_service.pysrc/api/management/application/services/knowledge_graph_service.pysrc/api/management/presentation/__init__.pysrc/api/management/presentation/data_sources/__init__.pysrc/api/management/presentation/data_sources/models.pysrc/api/management/presentation/data_sources/routes.pysrc/api/management/presentation/knowledge_graphs/__init__.pysrc/api/management/presentation/knowledge_graphs/models.pysrc/api/management/presentation/knowledge_graphs/routes.pysrc/api/tests/integration/management/conftest.pysrc/api/tests/integration/management/test_data_source_api.pysrc/api/tests/integration/management/test_knowledge_graph_api.pysrc/api/tests/unit/management/presentation/__init__.pysrc/api/tests/unit/management/presentation/test_data_source_routes.pysrc/api/tests/unit/management/presentation/test_knowledge_graph_routes.pysrc/api/tests/unit/management/test_architecture.py
| if raw_credentials is not None: | ||
| cred_path = f"datasource/{ds.id.value}/credentials" | ||
| await self._secret_store.store( | ||
| path=cred_path, | ||
| tenant_id=self._scope_to_tenant, | ||
| credentials=raw_credentials, | ||
| ) | ||
| ds.credentials_path = cred_path | ||
|
|
||
| await self._ds_repo.save(ds) | ||
| except IntegrityError as e: |
There was a problem hiding this comment.
Avoid writing secrets before the database write is known to succeed.
At Line 167 and Line 339, _secret_store.store() runs before the database work has been flushed/committed. If save() or commit then fails, create leaves an orphaned secret and update can rotate credentials even though the request returns an error. It also keeps the transaction open across external I/O. Flush first, then write the secret, and add compensation if anything after the secret write fails.
Also applies to: 337-347
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/management/application/services/data_source_service.py` around lines
165 - 175, The secret is being stored via _secret_store.store(...) before the
database save/commit (_ds_repo.save(ds)) completes, which can create orphaned
secrets and prolong transactions; change the flow so you persist/flush/commit
the datasource first (call _ds_repo.save(ds) and ensure the transaction is
committed or flushed) and only then call _secret_store.store(...) and set
ds.credentials_path, minimizing time inside the DB transaction; additionally,
add compensation logic around the secret write (e.g., if any subsequent
operation after storing the secret fails, call _secret_store.delete(...) to
remove the orphaned secret) and ensure error handling around the try/except
block (including the IntegrityError handler) is updated to account for the new
ordering.
| except ValueError as e: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail=str(e), | ||
| ) |
There was a problem hiding this comment.
Replace message-based ValueError handling with typed service exceptions.
You already use typed exceptions for duplicate and unauthorized cases; not-found/validation should follow the same pattern. Right now the router either echoes str(e) to the client or parses "not found" out of the message to pick the status code, and src/api/management/application/services/data_source_service.py Lines 149-152 include KG/tenant details in those messages.
Also applies to: 167-177
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/management/presentation/data_sources/routes.py` around lines 70 - 74,
The router currently catches generic ValueError and returns str(e); replace this
with typed service exceptions (e.g., DataSourceNotFoundError and
DataSourceValidationError) so the route can map them to 404 and 400 respectively
instead of echoing/parsing error messages. Update the service (see
data_source_service.py around the 149-152 area) to raise these specific
exceptions (and remove KG/tenant details from their messages), then change the
two ValueError handlers in routes.py (both the 70-74 block and the similar
167-177 block) to catch the new exceptions and raise HTTPException with the
proper status_code and a safe, minimal detail message. Ensure existing
Duplicate* and Unauthorized* exception handling remains unchanged.
| all_ds = await service.list_for_knowledge_graph( | ||
| user_id=current_user.user_id.value, | ||
| kg_id=kg_id, | ||
| ) | ||
| total = len(all_ds) | ||
| paginated = all_ds[offset : offset + limit] | ||
| return DataSourceListResponse( | ||
| items=[DataSourceResponse.from_domain(ds) for ds in paginated], | ||
| total=total, | ||
| offset=offset, | ||
| limit=limit, | ||
| ) |
There was a problem hiding this comment.
Push pagination below the route layer.
At Line 91, the handler loads every data source and only then applies offset/limit. That makes latency and memory grow with the full collection size, so this endpoint will degrade as a knowledge graph grows. Have the service/repository return the requested page and total count directly.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/management/presentation/data_sources/routes.py` around lines 91 -
102, The route is loading all data sources then slicing (offset/limit) which
causes memory/latency issues; update the handler to push pagination into the
service/repository layer by changing the call to
service.list_for_knowledge_graph to accept offset and limit (e.g.
list_for_knowledge_graph(user_id=..., kg_id=..., offset=offset, limit=limit))
and make that service return a paginated result (items + total) instead of a
full list, then construct DataSourceListResponse from that returned items and
total (remove the local len() and slicing/paginated logic and stop using
paginated = all_ds[offset:offset+limit]); also update the underlying repository
methods to perform the database-level pagination and counting so
DataSourceResponse.from_domain is fed only the page of items.
| except HTTPException: | ||
| raise | ||
| except Exception: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | ||
| detail="Failed to delete knowledge graph", | ||
| ) |
There was a problem hiding this comment.
Catch-all exception handler masks underlying errors.
The broad except Exception clause will catch and hide any unexpected errors as a generic 500 response. This makes debugging difficult and may mask bugs. Consider removing it and letting unexpected exceptions propagate to FastAPI's default error handling, or at least log the exception.
🛡️ Suggested fix
except HTTPException:
raise
- except Exception:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="Failed to delete knowledge graph",
- )
+ # Let unexpected exceptions propagate for proper error loggingOr if you need the catch-all:
+ import logging
+ logger = logging.getLogger(__name__)
+
except Exception:
+ logger.exception("Unexpected error deleting knowledge graph %s", kg_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to delete knowledge graph",
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| except HTTPException: | |
| raise | |
| except Exception: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to delete knowledge graph", | |
| ) | |
| except HTTPException: | |
| raise | |
| # Let unexpected exceptions propagate for proper error logging |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/management/presentation/knowledge_graphs/routes.py` around lines 197
- 203, The catch-all "except Exception:" in the delete knowledge graph route
masks underlying errors; remove that broad except so unexpected exceptions
propagate to FastAPI's default handler, or if you must keep a catch-all, capture
the exception as "except Exception as e:" and call the module logger with
logger.exception("Failed to delete knowledge graph") then re-raise HTTPException
providing contextual detail (e.g., detail=str(e) or a sanitized message) so the
original error is logged; modify the block around HTTPException to either delete
the generic except or replace it with the logged-and-re-raised variant involving
HTTPException, "e", and logger.exception.
…error to SyncRunResponse - Route credential-only updates through ds.update_connection() instead of directly mutating ds.credentials_path, ensuring DataSourceUpdated event emission and updated_at timestamp update - Add error field to SyncRunResponse so failed syncs expose their error message Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (1)
src/api/management/presentation/data_sources/routes.py (1)
70-74:⚠️ Potential issue | 🟠 MajorStop using
ValueErrormessage parsing/echoing for HTTP mapping.These handlers currently depend on
str(e)(including"not found"string matching), which is brittle and can expose internal service details to clients. Please switch to typed service exceptions (not foundvsvalidation) and map them explicitly to 404/400.As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."
Also applies to: 188-198, 267-271
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/management/presentation/data_sources/routes.py` around lines 70 - 74, Current handlers catch ValueError and echo str(e) which is brittle and leaks internals; replace this by introducing typed service exceptions (e.g., ServiceNotFoundError and ServiceValidationError) in the service layer and raise those instead of ValueError, then update the route handlers in routes.py to catch ServiceNotFoundError -> raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="resource not found") and catch ServiceValidationError -> raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e.validation_message) or a sanitized message); remove any logic that parses "not found" from str(e) and ensure the route handlers reference the new exception classes instead of ValueError.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/api/management/application/services/knowledge_graph_service.py`:
- Around line 258-268: The current code fetches all KG IDs then does N+1 reads
and applies offset/limit after loading (variables kg_ids, kgs, and method
_kg_repo.get_by_id), causing poor scalability; change the flow to apply
pagination at the repo level (do not load all KGs then slice). Update or use a
repository method that returns paginated KGs filtered by tenant (e.g., add or
call _kg_repo.list_for_workspace or _kg_repo.list_by_workspace with offset and
limit and tenant_id parameters) so you fetch only the requested page and avoid
per-id get_by_id loops; adjust callers to expect total count and the paginated
list returned from the repo.
- Around line 390-395: The current deletion uses a single fetch with limit=10000
(in knowledge_graph_service.py) which can leave data sources undeleted; change
the logic around self._ds_repo.find_by_knowledge_graph and deletion so you page
through results until none remain: repeatedly call find_by_knowledge_graph
(using a reasonable page_size/limit) and for each returned batch call
ds.mark_for_deletion(...) and await self._ds_repo.delete(ds) for each item,
looping (incrementing offset or using the repo’s cursor) until the fetched batch
is empty; ensure this loop runs before deleting the KG to avoid integrity
errors.
In `@src/api/management/presentation/knowledge_graphs/routes.py`:
- Around line 61-65: Replace the broad except (InvalidKnowledgeGraphNameError,
ValueError) blocks that inspect and echo ValueError text with explicit typed
exceptions from the KnowledgeGraphService (e.g., KnowledgeGraphNotFoundError,
KnowledgeGraphValidationError) and map each to a fixed, safe HTTPException
message; update the handlers in routes.py (including the other occurrences
around the 178–188 area) to catch those concrete exceptions instead of
ValueError and return appropriate status codes (400 for validation, 404 for
not-found) with non-sensitive, constant detail strings rather than str(e).
In `@src/api/tests/integration/management/conftest.py`:
- Around line 111-113: The DELETE statement that cleans the outbox only targets
aggregate_type values ('KnowledgeGraph','DataSource') but management events here
use lowercase names, so update the cleanup in conftest.py to match
case-insensitively (e.g., use LOWER(aggregate_type) IN
('knowledge_graph','data_source') or add both cased variants) to ensure rows for
the outbox table are removed; locate the DELETE FROM outbox WHERE aggregate_type
... statement in conftest.py and change it accordingly.
---
Duplicate comments:
In `@src/api/management/presentation/data_sources/routes.py`:
- Around line 70-74: Current handlers catch ValueError and echo str(e) which is
brittle and leaks internals; replace this by introducing typed service
exceptions (e.g., ServiceNotFoundError and ServiceValidationError) in the
service layer and raise those instead of ValueError, then update the route
handlers in routes.py to catch ServiceNotFoundError -> raise
HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="resource not
found") and catch ServiceValidationError -> raise
HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e.validation_message) or a sanitized message); remove any logic that
parses "not found" from str(e) and ensure the route handlers reference the new
exception classes instead of ValueError.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 64438ed4-e25d-4eb9-8a3a-9d339ea59a43
📒 Files selected for processing (14)
src/api/management/application/services/data_source_service.pysrc/api/management/application/services/knowledge_graph_service.pysrc/api/management/infrastructure/repositories/data_source_repository.pysrc/api/management/infrastructure/repositories/knowledge_graph_repository.pysrc/api/management/ports/repositories.pysrc/api/management/presentation/data_sources/routes.pysrc/api/management/presentation/knowledge_graphs/routes.pysrc/api/tests/integration/management/conftest.pysrc/api/tests/integration/management/test_data_source_repository.pysrc/api/tests/integration/management/test_knowledge_graph_repository.pysrc/api/tests/unit/management/application/test_data_source_service.pysrc/api/tests/unit/management/application/test_knowledge_graph_service.pysrc/api/tests/unit/management/presentation/test_data_source_routes.pysrc/api/tests/unit/management/presentation/test_knowledge_graph_routes.py
| data_sources, _ = await self._ds_repo.find_by_knowledge_graph( | ||
| kg_id, offset=0, limit=10000 | ||
| ) | ||
| for ds in data_sources: | ||
| ds.mark_for_deletion(deleted_by=user_id) | ||
| await self._ds_repo.delete(ds) |
There was a problem hiding this comment.
Cascade delete can miss records because of the fixed limit=10000.
If a KG has more than 10,000 data sources, this path won’t delete all children before deleting the KG. That can produce integrity failures and false 500s.
Proposed fix (delete in pages until empty)
- data_sources, _ = await self._ds_repo.find_by_knowledge_graph(
- kg_id, offset=0, limit=10000
- )
- for ds in data_sources:
- ds.mark_for_deletion(deleted_by=user_id)
- await self._ds_repo.delete(ds)
+ page_size = 500
+ while True:
+ data_sources, _ = await self._ds_repo.find_by_knowledge_graph(
+ kg_id, offset=0, limit=page_size
+ )
+ if not data_sources:
+ break
+ for ds in data_sources:
+ ds.mark_for_deletion(deleted_by=user_id)
+ await self._ds_repo.delete(ds)As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| data_sources, _ = await self._ds_repo.find_by_knowledge_graph( | |
| kg_id, offset=0, limit=10000 | |
| ) | |
| for ds in data_sources: | |
| ds.mark_for_deletion(deleted_by=user_id) | |
| await self._ds_repo.delete(ds) | |
| page_size = 500 | |
| while True: | |
| data_sources, _ = await self._ds_repo.find_by_knowledge_graph( | |
| kg_id, offset=0, limit=page_size | |
| ) | |
| if not data_sources: | |
| break | |
| for ds in data_sources: | |
| ds.mark_for_deletion(deleted_by=user_id) | |
| await self._ds_repo.delete(ds) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/management/application/services/knowledge_graph_service.py` around
lines 390 - 395, The current deletion uses a single fetch with limit=10000 (in
knowledge_graph_service.py) which can leave data sources undeleted; change the
logic around self._ds_repo.find_by_knowledge_graph and deletion so you page
through results until none remain: repeatedly call find_by_knowledge_graph
(using a reasonable page_size/limit) and for each returned batch call
ds.mark_for_deletion(...) and await self._ds_repo.delete(ds) for each item,
looping (incrementing offset or using the repo’s cursor) until the fetched batch
is empty; ensure this loop runs before deleting the KG to avoid integrity
errors.
| except (InvalidKnowledgeGraphNameError, ValueError) as e: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail=str(e), | ||
| ) |
There was a problem hiding this comment.
Avoid ValueError string inspection/echo in route error handling.
Using str(e) for client details and "not found" text matching makes status mapping fragile and leaks service internals. Prefer explicit typed exceptions from KnowledgeGraphService (e.g., not-found vs validation) and fixed safe response messages.
As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."
Also applies to: 178-188
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/management/presentation/knowledge_graphs/routes.py` around lines 61 -
65, Replace the broad except (InvalidKnowledgeGraphNameError, ValueError) blocks
that inspect and echo ValueError text with explicit typed exceptions from the
KnowledgeGraphService (e.g., KnowledgeGraphNotFoundError,
KnowledgeGraphValidationError) and map each to a fixed, safe HTTPException
message; update the handlers in routes.py (including the other occurrences
around the 178–188 area) to catch those concrete exceptions instead of
ValueError and return appropriate status codes (400 for validation, 404 for
not-found) with non-sensitive, constant detail strings rather than str(e).
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
src/api/management/application/services/data_source_service.py (1)
165-174:⚠️ Potential issue | 🔴 CriticalSecret writes still happen before the database outcome is known.
_secret_store.store()runs inside the open transaction and before the data-source write/commit has succeeded. Ifsave()/flush/commit then fails, create/update can leave an orphaned or prematurely rotated secret, and the DB transaction stays open across external I/O. Persist/flush first, then write the secret, and add compensation if anything after the secret write fails.Also applies to: 344-359
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/management/application/services/data_source_service.py` around lines 165 - 174, The secret is being written via _secret_store.store() before the DB transaction completes (call to self._ds_repo.save(ds)), which can leave orphaned/rotated secrets or keep the transaction open; change the flow in the create/update path so you persist/flush/commit the datasource first (call self._ds_repo.save(ds) and ensure the unit-of-work/transaction is committed or flushed) and only after successful DB commit perform self._secret_store.store(path=cred_path, tenant_id=self._scope_to_tenant, credentials=raw_credentials) and set ds.credentials_path accordingly; if the secret write fails, add compensation logic to remove the newly committed credentials_path (or attempt a DB-side rollback/update to clear credentials_path) so the DB and secret store remain consistent (apply the same fix for the other occurrence around lines 344-359).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/api/management/application/services/data_source_service.py`:
- Around line 334-357: The code currently calls ds.update_connection() twice
(before and after storing secrets), causing duplicate DataSourceUpdated events;
instead compute the final name, connection_config, and credentials_path first,
then: if raw_credentials is provided, await self._secret_store.store(... ) to
write secrets and set credentials_path to
f"datasource/{ds.id.value}/credentials", then call ds.update_connection(...)
exactly once with the final values (name, connection_config, credentials_path,
updated_by=user_id); if raw_credentials is not provided but
name/connection_config changed, call ds.update_connection(...) once with the
computed final values; reference ds.update_connection,
DataSource.update_connection, _secret_store.store, raw_credentials, and
credentials_path when making this change.
In `@src/api/management/presentation/data_sources/models.py`:
- Around line 136-156: The from_domain classmethod on SyncRunResponse currently
forwards DataSourceSyncRun.error verbatim into the API response, which can leak
internal details; change SyncRunResponse.from_domain to not include raw
sync_run.error directly and instead set the response error to a sanitized
client-facing value (e.g., a short reason string or enum code) by calling a new
helper (e.g., sanitize_sync_error or map_sync_error_to_code) or returning None
when appropriate, and ensure the original sync_run.error is preserved only in
logs/observability (log or attach to internal diagnostics) rather than returned
to clients.
---
Duplicate comments:
In `@src/api/management/application/services/data_source_service.py`:
- Around line 165-174: The secret is being written via _secret_store.store()
before the DB transaction completes (call to self._ds_repo.save(ds)), which can
leave orphaned/rotated secrets or keep the transaction open; change the flow in
the create/update path so you persist/flush/commit the datasource first (call
self._ds_repo.save(ds) and ensure the unit-of-work/transaction is committed or
flushed) and only after successful DB commit perform
self._secret_store.store(path=cred_path, tenant_id=self._scope_to_tenant,
credentials=raw_credentials) and set ds.credentials_path accordingly; if the
secret write fails, add compensation logic to remove the newly committed
credentials_path (or attempt a DB-side rollback/update to clear
credentials_path) so the DB and secret store remain consistent (apply the same
fix for the other occurrence around lines 344-359).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7e9a1681-f3bb-455d-8a8a-744c110a36e2
📒 Files selected for processing (2)
src/api/management/application/services/data_source_service.pysrc/api/management/presentation/data_sources/models.py
| if name is not None or connection_config is not None: | ||
| ds.update_connection( | ||
| name=name if name is not None else ds.name, | ||
| connection_config=connection_config | ||
| if connection_config is not None | ||
| else ds.connection_config, | ||
| credentials_path=ds.credentials_path, | ||
| updated_by=user_id, | ||
| ) | ||
|
|
||
| if raw_credentials is not None: | ||
| cred_path = f"datasource/{ds.id.value}/credentials" | ||
| await self._secret_store.store( | ||
| path=cred_path, | ||
| tenant_id=self._scope_to_tenant, | ||
| credentials=raw_credentials, | ||
| ) | ||
| # Update via aggregate method to emit event and update timestamps | ||
| ds.update_connection( | ||
| name=ds.name, | ||
| connection_config=ds.connection_config, | ||
| credentials_path=cred_path, | ||
| updated_by=user_id, | ||
| ) |
There was a problem hiding this comment.
A combined PATCH can emit two DataSourceUpdated events.
When a request changes name/config and credentials together, this method calls ds.update_connection() once before the secret write and again after it. In src/api/management/domain/aggregates/data_source.py, DataSource.update_connection() appends a DataSourceUpdated event on every call, so one API request produces duplicate update events and timestamp churn. Compute the final values first and invoke the aggregate once.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/management/application/services/data_source_service.py` around lines
334 - 357, The code currently calls ds.update_connection() twice (before and
after storing secrets), causing duplicate DataSourceUpdated events; instead
compute the final name, connection_config, and credentials_path first, then: if
raw_credentials is provided, await self._secret_store.store(... ) to write
secrets and set credentials_path to f"datasource/{ds.id.value}/credentials",
then call ds.update_connection(...) exactly once with the final values (name,
connection_config, credentials_path, updated_by=user_id); if raw_credentials is
not provided but name/connection_config changed, call ds.update_connection(...)
once with the computed final values; reference ds.update_connection,
DataSource.update_connection, _secret_store.store, raw_credentials, and
credentials_path when making this change.
| error: str | None | ||
| created_at: datetime | ||
|
|
||
| @classmethod | ||
| def from_domain(cls, sync_run: DataSourceSyncRun) -> SyncRunResponse: | ||
| """Convert domain DataSourceSyncRun entity to API response. | ||
|
|
||
| Args: | ||
| sync_run: DataSourceSyncRun domain entity | ||
|
|
||
| Returns: | ||
| SyncRunResponse with sync run details | ||
| """ | ||
| return cls( | ||
| id=sync_run.id, | ||
| data_source_id=sync_run.data_source_id, | ||
| status=sync_run.status, | ||
| started_at=sync_run.started_at, | ||
| completed_at=sync_run.completed_at, | ||
| error=sync_run.error, | ||
| created_at=sync_run.created_at, |
There was a problem hiding this comment.
Avoid making raw sync failure text part of the public API.
sync_run.error is forwarded verbatim into the response model. If that field carries upstream exception text, failed syncs can leak internal URLs, identifiers, or secrets. Prefer a sanitized client-facing reason/code here and keep the raw error for logs/observability.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/management/presentation/data_sources/models.py` around lines 136 -
156, The from_domain classmethod on SyncRunResponse currently forwards
DataSourceSyncRun.error verbatim into the API response, which can leak internal
details; change SyncRunResponse.from_domain to not include raw sync_run.error
directly and instead set the response error to a sanitized client-facing value
(e.g., a short reason string or enum code) by calling a new helper (e.g.,
sanitize_sync_error or map_sync_error_to_code) or returning None when
appropriate, and ensure the original sync_run.error is preserved only in
logs/observability (log or attach to internal diagnostics) rather than returned
to clients.
…atch
- Replace limit=10000 magic number with batched while-loop (100 per batch)
for cascade deleting data sources when a knowledge graph is deleted
- Fix outbox cleanup SQL to use snake_case aggregate types matching what
repositories actually write ('knowledge_graph', 'data_source')
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
Completes the Management context walking skeleton with REST API endpoints for KnowledgeGraph and DataSource CRUD.
Endpoints
/management/workspaces/{ws_id}/knowledge-graphs/management/workspaces/{ws_id}/knowledge-graphs/management/knowledge-graphs/{kg_id}/management/knowledge-graphs/{kg_id}/management/knowledge-graphs/{kg_id}/management/knowledge-graphs/{kg_id}/data-sources/management/knowledge-graphs/{kg_id}/data-sources/management/data-sources/{ds_id}/management/data-sources/{ds_id}/management/data-sources/{ds_id}/management/data-sources/{ds_id}/syncPre-work fixes included
DataSourceService.create/update()now catchesIntegrityError→DuplicateDataSourceNameErrorKnowledgeGraphService.update()supports partial updates (optional name/description)Key design decisions
offset,limit(default 20, max 100),total,itemshas_credentials: boolin response, never expose secretsNoneget()returns 404 for both not-found and access-denied (no existence leakage)Test plan
🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Tests