Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
from infrastructure.version import __version__
from iam.infrastructure.outbox import IAMEventTranslator
from management.infrastructure.outbox import ManagementEventTranslator
from infrastructure.outbox.composite import CompositeEventHandler
from infrastructure.outbox.spicedb_handler import SpiceDBEventHandler
from infrastructure.outbox.event_sources.postgres_notify import (
Expand Down Expand Up @@ -134,7 +135,12 @@ async def kartograph_lifespan(app: FastAPI):
authz=authz,
)
handler.register(spicedb_handler, handler_name="iam")
# Future: handler.register(management_handler, handler_name="management")
# Register SpiceDB handler wrapping the Management translator
management_spicedb_handler = SpiceDBEventHandler(
translator=ManagementEventTranslator(),
authz=authz,
)
handler.register(management_spicedb_handler, handler_name="management")

# Create event source for real-time NOTIFY processing
event_source = PostgresNotifyEventSource(
Expand Down
7 changes: 4 additions & 3 deletions src/api/management/infrastructure/outbox/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Management outbox integration.

Provides event serialization for Management domain events
to be stored in the transactional outbox.
Provides event serialization and SpiceDB translation for Management
domain events processed through the transactional outbox.
"""

from management.infrastructure.outbox.serializer import ManagementEventSerializer
from management.infrastructure.outbox.translator import ManagementEventTranslator

__all__ = ["ManagementEventSerializer"]
__all__ = ["ManagementEventSerializer", "ManagementEventTranslator"]
295 changes: 295 additions & 0 deletions src/api/management/infrastructure/outbox/translator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
"""Management-specific event translator for SpiceDB operations.

This module provides the translation layer between Management domain events
and SpiceDB relationship operations. It uses type-safe enums for all resource
types and relations to avoid magic strings.

The translator uses a dictionary-based dispatch approach with automatic
validation to ensure all domain events have corresponding handlers.
"""

from __future__ import annotations

from typing import Any, Callable, get_args

from management.domain.events import (
DataSourceCreated,
DataSourceDeleted,
DataSourceSyncRequested,
DataSourceUpdated,
DomainEvent,
KnowledgeGraphCreated,
KnowledgeGraphDeleted,
KnowledgeGraphUpdated,
)
from shared_kernel.authorization.types import RelationType, ResourceType
from shared_kernel.outbox.operations import (
DeleteRelationship,
DeleteRelationshipsByFilter,
SpiceDBOperation,
WriteRelationship,
)

# Build registry mapping event type names to classes
_EVENT_REGISTRY: dict[str, type] = {cls.__name__: cls for cls in get_args(DomainEvent)}


class ManagementEventTranslator:
"""Translates Management domain events to SpiceDB operations.

This translator handles all Management-specific events defined in the
DomainEvent type alias. Handler methods are mapped via a dictionary
and validated at initialization to ensure completeness.

Management events establish authorization relationships for knowledge
graphs and data sources, linking them to their parent workspaces,
knowledge graphs, and tenants in the SpiceDB permission system.
"""

def __init__(self) -> None:
"""Initialize translator and validate all events have handlers."""
# Map event classes to handler methods
self._handlers: dict[
type, Callable[[dict[str, Any]], list[SpiceDBOperation]]
] = {
KnowledgeGraphCreated: self._translate_knowledge_graph_created,
KnowledgeGraphUpdated: self._translate_knowledge_graph_updated,
KnowledgeGraphDeleted: self._translate_knowledge_graph_deleted,
DataSourceCreated: self._translate_data_source_created,
DataSourceUpdated: self._translate_data_source_updated,
DataSourceDeleted: self._translate_data_source_deleted,
DataSourceSyncRequested: self._translate_data_source_sync_requested,
}

# Validate all domain events have handlers
self._validate_handlers()

def _validate_handlers(self) -> None:
"""Ensure all domain events have handler methods.

This is primarily a developer convenience - Kartograph
will fail to start if a DomainEvent doesn't have a registered handler.

Raises:
ValueError: If any domain events are missing handlers
"""
event_types = set(get_args(DomainEvent))
handler_types = set(self._handlers.keys())

missing = event_types - handler_types
if missing:
missing_names = [e.__name__ for e in missing]
raise ValueError(
f"Missing translation handlers for events: {missing_names}"
)

def supported_event_types(self) -> frozenset[str]:
"""Return the event type names this translator handles."""
return frozenset(cls.__name__ for cls in self._handlers.keys())

def translate(
self,
event_type: str,
payload: dict[str, Any],
) -> list[SpiceDBOperation]:
"""Convert an event payload to SpiceDB operations.

Args:
event_type: The name of the event type
payload: The serialized event data

Returns:
List of SpiceDB operations to execute

Raises:
ValueError: If the event type is not supported
"""
# Get event class from registry
event_class = _EVENT_REGISTRY.get(event_type)
if not event_class:
raise ValueError(f"Unknown event type: {event_type}")

# Look up handler method
handler = self._handlers.get(event_class)
if not handler:
raise ValueError(f"No handler for event: {event_type}")

return handler(payload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Normalize malformed payload failures to ValueError at the translation boundary.

At Line 117, missing required fields currently bubble as KeyError from handler internals, which makes outbox failure handling/error classification inconsistent with the explicit ValueError contract used for unsupported events.

Proposed fix
     def translate(
         self,
         event_type: str,
         payload: dict[str, Any],
     ) -> list[SpiceDBOperation]:
@@
-        return handler(payload)
+        try:
+            return handler(payload)
+        except KeyError as exc:
+            missing_key = exc.args[0]
+            raise ValueError(
+                f"Invalid payload for event {event_type}: "
+                f"missing required field '{missing_key}'"
+            ) from exc

As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return handler(payload)
try:
return handler(payload)
except KeyError as exc:
missing_key = exc.args[0]
raise ValueError(
f"Invalid payload for event {event_type}: "
f"missing required field '{missing_key}'"
) from exc
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/management/infrastructure/outbox/translator.py` at line 117, The
translator currently returns handler(payload) directly which allows
missing-field KeyError exceptions from handler internals to leak; wrap the call
to handler (the invocation returning at Line 117) in a try/except that catches
KeyError (and optionally TypeError for malformed payload shapes) and re-raise a
ValueError with a clear message preserving the original exception as the cause
so malformed payloads are normalized at the translation boundary (i.e., convert
KeyError -> ValueError before propagating from the translator function that
calls handler).


def _translate_knowledge_graph_created(
self,
payload: dict[str, Any],
) -> list[SpiceDBOperation]:
"""Translate KnowledgeGraphCreated to workspace and tenant relationship writes.

Creates two relationships:
- knowledge_graph:<id>#workspace@workspace:<workspace_id>
- knowledge_graph:<id>#tenant@tenant:<tenant_id>

These relationships enable permission inheritance: workspace members
inherit access to knowledge graphs within that workspace.
"""
return [
WriteRelationship(
resource_type=ResourceType.KNOWLEDGE_GRAPH,
resource_id=payload["knowledge_graph_id"],
relation=RelationType.WORKSPACE,
subject_type=ResourceType.WORKSPACE,
subject_id=payload["workspace_id"],
),
WriteRelationship(
resource_type=ResourceType.KNOWLEDGE_GRAPH,
resource_id=payload["knowledge_graph_id"],
relation=RelationType.TENANT,
subject_type=ResourceType.TENANT,
subject_id=payload["tenant_id"],
),
]

def _translate_knowledge_graph_updated(
self,
payload: dict[str, Any],
) -> list[SpiceDBOperation]:
"""Translate KnowledgeGraphUpdated - no SpiceDB changes needed.

Metadata updates (name, description) do not affect authorization
relationships. The workspace and tenant associations remain unchanged.
"""
return []

def _translate_knowledge_graph_deleted(
self,
payload: dict[str, Any],
) -> list[SpiceDBOperation]:
"""Translate KnowledgeGraphDeleted to delete all relationships.

Removes the workspace and tenant relationships created during
knowledge graph creation, plus any direct user permission grants
(admin, editor, viewer) using filter-based deletion.

Order: direct deletes first, then filter deletes.

Deletes:
- knowledge_graph:<id>#workspace@workspace:<workspace_id>
- knowledge_graph:<id>#tenant@tenant:<tenant_id>
- knowledge_graph:<id>#admin@* (filter)
- knowledge_graph:<id>#editor@* (filter)
- knowledge_graph:<id>#viewer@* (filter)
"""
return [
# Direct deletes for workspace and tenant
DeleteRelationship(
resource_type=ResourceType.KNOWLEDGE_GRAPH,
resource_id=payload["knowledge_graph_id"],
relation=RelationType.WORKSPACE,
subject_type=ResourceType.WORKSPACE,
subject_id=payload["workspace_id"],
),
DeleteRelationship(
resource_type=ResourceType.KNOWLEDGE_GRAPH,
resource_id=payload["knowledge_graph_id"],
relation=RelationType.TENANT,
subject_type=ResourceType.TENANT,
subject_id=payload["tenant_id"],
),
# Filter deletes for any direct admin/editor/viewer grants
DeleteRelationshipsByFilter(
resource_type=ResourceType.KNOWLEDGE_GRAPH,
resource_id=payload["knowledge_graph_id"],
relation=RelationType.ADMIN,
),
DeleteRelationshipsByFilter(
resource_type=ResourceType.KNOWLEDGE_GRAPH,
resource_id=payload["knowledge_graph_id"],
relation=RelationType.EDITOR,
),
DeleteRelationshipsByFilter(
resource_type=ResourceType.KNOWLEDGE_GRAPH,
resource_id=payload["knowledge_graph_id"],
relation=RelationType.VIEWER,
),
]

def _translate_data_source_created(
self,
payload: dict[str, Any],
) -> list[SpiceDBOperation]:
"""Translate DataSourceCreated to knowledge graph and tenant relationship writes.

Creates two relationships:
- data_source:<id>#knowledge_graph@knowledge_graph:<kg_id>
- data_source:<id>#tenant@tenant:<tenant_id>

These relationships enable permission inheritance: knowledge graph
members inherit access to data sources within that knowledge graph.
"""
return [
WriteRelationship(
resource_type=ResourceType.DATA_SOURCE,
resource_id=payload["data_source_id"],
relation=RelationType.KNOWLEDGE_GRAPH,
subject_type=ResourceType.KNOWLEDGE_GRAPH,
subject_id=payload["knowledge_graph_id"],
),
WriteRelationship(
resource_type=ResourceType.DATA_SOURCE,
resource_id=payload["data_source_id"],
relation=RelationType.TENANT,
subject_type=ResourceType.TENANT,
subject_id=payload["tenant_id"],
),
]

def _translate_data_source_updated(
self,
payload: dict[str, Any],
) -> list[SpiceDBOperation]:
"""Translate DataSourceUpdated - no SpiceDB changes needed.

Connection configuration updates do not affect authorization
relationships. The knowledge graph and tenant associations remain
unchanged.
"""
return []

def _translate_data_source_deleted(
self,
payload: dict[str, Any],
) -> list[SpiceDBOperation]:
"""Translate DataSourceDeleted to delete all relationships.

Removes the knowledge graph and tenant relationships created during
data source creation.

Deletes:
- data_source:<id>#knowledge_graph@knowledge_graph:<kg_id>
- data_source:<id>#tenant@tenant:<tenant_id>
"""
return [
DeleteRelationship(
resource_type=ResourceType.DATA_SOURCE,
resource_id=payload["data_source_id"],
relation=RelationType.KNOWLEDGE_GRAPH,
subject_type=ResourceType.KNOWLEDGE_GRAPH,
subject_id=payload["knowledge_graph_id"],
),
DeleteRelationship(
resource_type=ResourceType.DATA_SOURCE,
resource_id=payload["data_source_id"],
relation=RelationType.TENANT,
subject_type=ResourceType.TENANT,
subject_id=payload["tenant_id"],
),
]

def _translate_data_source_sync_requested(
self,
payload: dict[str, Any],
) -> list[SpiceDBOperation]:
"""Translate DataSourceSyncRequested - no SpiceDB changes needed.

Sync requests do not affect authorization relationships. This event
exists for consumption by the Ingestion bounded context to trigger
data source synchronization workflows.
"""
return []
Loading
Loading