-
Notifications
You must be signed in to change notification settings - Fork 2
feat(management): add FastAPI routes and dependency injection (AIHCM-185) #303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
954a84e
d296a87
5103aa6
724aa0f
cfce711
74e6ab0
634108b
5b7d0a6
800e749
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ | |
|
|
||
| from datetime import UTC, datetime | ||
|
|
||
| from sqlalchemy.exc import IntegrityError | ||
| from sqlalchemy.ext.asyncio import AsyncSession | ||
| from ulid import ULID | ||
|
|
||
|
|
@@ -18,7 +19,7 @@ | |
| from management.domain.aggregates import DataSource | ||
| from management.domain.entities import DataSourceSyncRun | ||
| from management.domain.value_objects import DataSourceId, KnowledgeGraphId | ||
| from management.ports.exceptions import UnauthorizedError | ||
| from management.ports.exceptions import DuplicateDataSourceNameError, UnauthorizedError | ||
| from management.ports.repositories import ( | ||
| IDataSourceRepository, | ||
| IDataSourceSyncRunRepository, | ||
|
|
@@ -150,26 +151,36 @@ async def create( | |
| if kg.tenant_id != self._scope_to_tenant: | ||
| raise ValueError(f"Knowledge graph {kg_id} belongs to different tenant") | ||
|
|
||
| async with self._session.begin(): | ||
| ds = DataSource.create( | ||
| knowledge_graph_id=kg_id, | ||
| tenant_id=self._scope_to_tenant, | ||
| name=name, | ||
| adapter_type=adapter_type, | ||
| connection_config=connection_config, | ||
| created_by=user_id, | ||
| ) | ||
|
|
||
| if raw_credentials is not None: | ||
| cred_path = f"datasource/{ds.id.value}/credentials" | ||
| await self._secret_store.store( | ||
| path=cred_path, | ||
| try: | ||
| async with self._session.begin(): | ||
| ds = DataSource.create( | ||
| knowledge_graph_id=kg_id, | ||
| tenant_id=self._scope_to_tenant, | ||
| credentials=raw_credentials, | ||
| name=name, | ||
| adapter_type=adapter_type, | ||
| connection_config=connection_config, | ||
| created_by=user_id, | ||
| ) | ||
| ds.credentials_path = cred_path | ||
|
|
||
| await self._ds_repo.save(ds) | ||
| if raw_credentials is not None: | ||
| cred_path = f"datasource/{ds.id.value}/credentials" | ||
| await self._secret_store.store( | ||
| path=cred_path, | ||
| tenant_id=self._scope_to_tenant, | ||
| credentials=raw_credentials, | ||
| ) | ||
| ds.credentials_path = cred_path | ||
|
|
||
| await self._ds_repo.save(ds) | ||
| except IntegrityError as e: | ||
| if "uq_data_sources_kg_name" in str(e): | ||
| self._probe.data_source_creation_failed( | ||
| kg_id=kg_id, name=name, error="duplicate name" | ||
| ) | ||
| raise DuplicateDataSourceNameError( | ||
| f"Data source '{name}' already exists in knowledge graph '{kg_id}'" | ||
| ) from e | ||
| raise | ||
|
|
||
| self._probe.data_source_created( | ||
| ds_id=ds.id.value, | ||
|
|
@@ -219,15 +230,20 @@ async def list_for_knowledge_graph( | |
| self, | ||
| user_id: str, | ||
| kg_id: str, | ||
| ) -> list[DataSource]: | ||
| """List data sources for a knowledge graph. | ||
| *, | ||
| offset: int = 0, | ||
| limit: int = 20, | ||
| ) -> tuple[list[DataSource], int]: | ||
| """List data sources for a knowledge graph with pagination. | ||
|
|
||
| Args: | ||
| user_id: The user requesting the list | ||
| kg_id: The knowledge graph to list DSes for | ||
| offset: Number of records to skip | ||
| limit: Maximum number of records to return | ||
|
|
||
| Returns: | ||
| List of DataSource aggregates | ||
| Tuple of (paginated DataSource aggregates, total count) | ||
|
|
||
| Raises: | ||
| UnauthorizedError: If user lacks VIEW permission on KG | ||
|
|
@@ -254,14 +270,16 @@ async def list_for_knowledge_graph( | |
| if kg is None or kg.tenant_id != self._scope_to_tenant: | ||
| raise UnauthorizedError(f"Knowledge graph {kg_id} not accessible") | ||
|
|
||
| data_sources = await self._ds_repo.find_by_knowledge_graph(kg_id) | ||
| data_sources, total = await self._ds_repo.find_by_knowledge_graph( | ||
| kg_id, offset=offset, limit=limit | ||
| ) | ||
|
|
||
| self._probe.data_sources_listed( | ||
| kg_id=kg_id, | ||
| count=len(data_sources), | ||
| ) | ||
|
|
||
| return data_sources | ||
| return data_sources, total | ||
|
|
||
| async def update( | ||
| self, | ||
|
|
@@ -311,27 +329,45 @@ async def update( | |
| if ds.tenant_id != self._scope_to_tenant: | ||
| raise ValueError(f"Data source {ds_id} not found") | ||
|
|
||
| async with self._session.begin(): | ||
| if name is not None or connection_config is not None: | ||
| ds.update_connection( | ||
| name=name if name is not None else ds.name, | ||
| connection_config=connection_config | ||
| if connection_config is not None | ||
| else ds.connection_config, | ||
| credentials_path=ds.credentials_path, | ||
| updated_by=user_id, | ||
| ) | ||
|
|
||
| if raw_credentials is not None: | ||
| cred_path = f"datasource/{ds.id.value}/credentials" | ||
| await self._secret_store.store( | ||
| path=cred_path, | ||
| tenant_id=self._scope_to_tenant, | ||
| credentials=raw_credentials, | ||
| try: | ||
| async with self._session.begin(): | ||
| if name is not None or connection_config is not None: | ||
| ds.update_connection( | ||
| name=name if name is not None else ds.name, | ||
| connection_config=connection_config | ||
| if connection_config is not None | ||
| else ds.connection_config, | ||
| credentials_path=ds.credentials_path, | ||
| updated_by=user_id, | ||
| ) | ||
|
|
||
| if raw_credentials is not None: | ||
| cred_path = f"datasource/{ds.id.value}/credentials" | ||
| await self._secret_store.store( | ||
| path=cred_path, | ||
| tenant_id=self._scope_to_tenant, | ||
| credentials=raw_credentials, | ||
| ) | ||
| # Update via aggregate method to emit event and update timestamps | ||
| ds.update_connection( | ||
| name=ds.name, | ||
| connection_config=ds.connection_config, | ||
| credentials_path=cred_path, | ||
| updated_by=user_id, | ||
| ) | ||
|
Comment on lines
+334
to
+357
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A combined PATCH can emit two When a request changes name/config and credentials together, this method calls 🤖 Prompt for AI Agents |
||
|
|
||
| await self._ds_repo.save(ds) | ||
| except IntegrityError as e: | ||
| if "uq_data_sources_kg_name" in str(e): | ||
| self._probe.data_source_creation_failed( | ||
| kg_id=ds.knowledge_graph_id, | ||
| name=name or ds.name, | ||
| error="duplicate name", | ||
| ) | ||
| ds.credentials_path = cred_path | ||
|
|
||
| await self._ds_repo.save(ds) | ||
| raise DuplicateDataSourceNameError( | ||
| f"Data source '{name or ds.name}' already exists in knowledge graph '{ds.knowledge_graph_id}'" | ||
| ) from e | ||
| raise | ||
|
|
||
| if name is not None: | ||
| self._probe.data_source_updated(ds_id=ds_id, name=name) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid writing secrets before the database write is known to succeed.
At Line 167 and Line 339,
_secret_store.store()runs before the database work has been flushed/committed. Ifsave()or commit then fails, create leaves an orphaned secret and update can rotate credentials even though the request returns an error. It also keeps the transaction open across external I/O. Flush first, then write the secret, and add compensation if anything after the secret write fails.Also applies to: 337-347
🤖 Prompt for AI Agents