diff --git a/src/openhound_okta/graph.py b/src/openhound_okta/graph.py index fb5ae68..754c632 100644 --- a/src/openhound_okta/graph.py +++ b/src/openhound_okta/graph.py @@ -11,6 +11,7 @@ @dataclass class OktaNodeProperties(BaseProperties): tenant: str + tenant_domain: str id: str diff --git a/src/openhound_okta/lookup.py b/src/openhound_okta/lookup.py index 87a10be..ecfc7da 100644 --- a/src/openhound_okta/lookup.py +++ b/src/openhound_okta/lookup.py @@ -21,13 +21,34 @@ def has_role_permission(self, role_id: str, permission: str) -> bool: f"""SELECT label FROM {self.schema}.custom_role_permissions WHERE role_id = ? AND label = ?""", [role_id, permission], ) - return res is not None + return res + + @lru_cache + def application_by_id(self, app_id: str) -> bool: + res = self._find_single_object( + f"""SELECT id FROM {self.schema}.applications WHERE id = ?""", + [app_id], + ) + return res + + @lru_cache + def application_settings(self, app_id: str) -> bool: + res = self._find_single_object( + f"""SELECT settings FROM {self.schema}.applications WHERE id = ?""", + [app_id], + ) + return res @lru_cache def all_groups(self): res = self._find_all_objects(f"""SELECT id FROM {self.schema}.groups""") return res + @lru_cache + def non_admin_groups(self): + res = self._find_all_objects(f"""SELECT id FROM {self.schema}.non_admin_groups""") + return res + @lru_cache def all_users(self): res = self._find_all_objects(f"""SELECT id FROM {self.schema}.users""") @@ -43,6 +64,57 @@ def all_applications(self): res = self._find_all_objects(f"""SELECT id FROM {self.schema}.applications""") return res + @lru_cache + def application_ids_by_name(self, app_name: str): + res = self._find_all_objects( + f"""SELECT id FROM {self.schema}.applications WHERE name = ?""", + [app_name], + ) + return res + + @lru_cache + def application_secret_ids(self, app_id: str): + res = self._find_all_objects( + f"""SELECT id FROM {self.schema}.application_secrets WHERE app_id = ?""", + [app_id], + ) + return res + + @lru_cache + def resource_set_application_ids(self, resource_set_id: str): + return self._resource_set_resource_ids( + resource_set_id, "apps", self.all_applications() + ) + + @lru_cache + def resource_set_group_ids(self, resource_set_id: str): + return self._resource_set_resource_ids( + resource_set_id, "groups", self.all_groups() + ) + + @lru_cache + def resource_set_non_admin_group_ids(self, resource_set_id: str): + resource_set_groups = set(self.resource_set_group_ids(resource_set_id)) + non_admin_groups = {group_id for (group_id,) in self.non_admin_groups()} + return tuple(sorted(resource_set_groups & non_admin_groups)) + + def _resource_set_resource_ids( + self, resource_set_id: str, resource_type: str, all_resource_rows + ): + rows = self._find_all_objects( + f"""SELECT orn FROM {self.schema}.resources WHERE resource_set_id = ? AND contains(orn, ?)""", + [resource_set_id, f":{resource_type}"], + ) + + resource_ids: set[str] = set() + for (orn,) in rows: + split_orn = orn.split(":") + if len(split_orn) == 5 and split_orn[-1] == resource_type: + resource_ids.update(resource_id for (resource_id,) in all_resource_rows) + elif len(split_orn) == 6 and split_orn[-2] == resource_type: + resource_ids.add(split_orn[-1]) + return tuple(sorted(resource_ids)) + @lru_cache def all_policies(self): res = self._find_all_objects(f"""SELECT id FROM {self.schema}.policies""") @@ -70,14 +142,7 @@ def all_devices(self): @lru_cache def manager_id(self, manager_login: str): res = self._find_single_object( - f"""SELECT id, json_value(profile, 'login') AS login FROM {self.schema}.users WHERE login = ?""", + f"""SELECT id FROM {self.schema}.users WHERE json_extract_string(profile, '$.login') = ?""", [manager_login], ) return res - - # @lru_cache - # def group_member_ids(self, group_id: str): - # res = self._find_all_objects( - # f"""SELECT id FROM {self.schema}.group_memberships WHERE group_id = ?""", - # [group_id] - # return res diff --git a/src/openhound_okta/main.py b/src/openhound_okta/main.py index a3842e4..1149d05 100644 --- a/src/openhound_okta/main.py +++ b/src/openhound_okta/main.py @@ -1,11 +1,12 @@ +import dlt from dlt.extract.source import DltSource from openhound.core.app import OpenHound from openhound.core.collect import CollectContext from openhound.core.convert import ConvertContext from openhound.core.preproc import PreProcContext -from openhound_okta.transforms import transforms from openhound_okta.lookup import OktaLookup +from openhound_okta.transforms import transforms app = OpenHound("okta", source_kind="Okta", help="OpenGraph collector for Okta") @@ -23,15 +24,16 @@ def collect(ctx: CollectContext) -> DltSource: @app.convert(lookup=OktaLookup) -def convert(ctx: ConvertContext) -> tuple[DltSource, dict]: +def convert(ctx: ConvertContext): """Register a Typer CLI command that converts previously collected Okta resources into OpenGraph nodes and edges. Args: ctx (ConvertContext): Returns DLT pipeline context. """ from openhound_okta.source import source as okta_source - - return okta_source(), {"tenant": "somethingsomething"} + from urllib.parse import urlparse + tenant_url = dlt.secrets.get("sources.source.okta.credentials.base_url") + return okta_source(), {"tenant": urlparse(tenant_url).netloc} @app.preproc(transformer=transforms) @@ -41,14 +43,14 @@ def preprocess(ctx: PreProcContext): "users": "users", "groups": "groups", "applications": "applications", + "application_secrets": "application_secrets", "devices": "devices", "authorization_servers": "authorization_servers", "identity_providers": "identity_providers", "policies": "policies", + "resources": "resources", "user_role_assignments": "user_role_assignments", "group_role_assignments": "group_role_assignments", "client_role_assignments": "client_role_assignments", "custom_role_permissions": "custom_role_permissions", - # "application_secrets": "application_secrets", - # "custom_role_permissions": "custom_role_permissions", } diff --git a/src/openhound_okta/models/agent.py b/src/openhound_okta/models/agent.py index 01da0ab..225e9f6 100644 --- a/src/openhound_okta/models/agent.py +++ b/src/openhound_okta/models/agent.py @@ -83,6 +83,7 @@ def as_node(self): kinds=[nk.AGENT], properties=AgentProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.name, displayname=self.name, @@ -95,10 +96,13 @@ def as_node(self): @property def _hosts_agent_edge(self): - # TODO: It seems that the agent name in Okta has TEST- prepended. Check the conditional logic if self.agent_type == "AD": + # The agent name has a prefix that needs to be stripped before matching is possible + agent_name_split = self.name.split("-") + agent_name = '-'.join(agent_name_split[1:]) + agent_match = f"{agent_name.upper()}.{self.agent_pool_name.upper()}" match_with = PropertyMatch( - key="domain", value=f"{self.name}.{self.agent_pool_name}" + key="name", value=agent_match ) yield Edge( start=ConditionalEdgePath( diff --git a/src/openhound_okta/models/agent_pool.py b/src/openhound_okta/models/agent_pool.py index f756a3f..7ba2d6b 100644 --- a/src/openhound_okta/models/agent_pool.py +++ b/src/openhound_okta/models/agent_pool.py @@ -83,6 +83,7 @@ def as_node(self): kinds=[nk.AGENT_POOL], properties=AgentPoolProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.name, displayname=self.name, diff --git a/src/openhound_okta/models/api_service.py b/src/openhound_okta/models/api_service.py index 6ac54ab..d575abc 100644 --- a/src/openhound_okta/models/api_service.py +++ b/src/openhound_okta/models/api_service.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import datetime from openhound.core.asset import BaseAsset, EdgeDef, NodeDef @@ -17,9 +17,6 @@ class ApiServiceProperties(OktaNodeProperties): app_type: str created_at: datetime oauth_scopes: list[str] | None = None - collected: bool = field( - default=True, metadata={"description": "Collected/generated by OpenHound"} - ) @app.asset( @@ -64,6 +61,7 @@ def as_node(self): kinds=[nk.INTEGRATION], properties=ApiServiceProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.name, displayname=self.name, diff --git a/src/openhound_okta/models/api_token.py b/src/openhound_okta/models/api_token.py index 277f63f..44e3ff3 100644 --- a/src/openhound_okta/models/api_token.py +++ b/src/openhound_okta/models/api_token.py @@ -58,6 +58,7 @@ def as_node(self): kinds=[nk.API_TOKEN], properties=ApiTokenProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.name, displayname=self.name, diff --git a/src/openhound_okta/models/application.py b/src/openhound_okta/models/application.py index 0ef9a22..775b964 100644 --- a/src/openhound_okta/models/application.py +++ b/src/openhound_okta/models/application.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import datetime from typing import ClassVar @@ -8,8 +8,6 @@ Edge, EdgePath, EdgeProperties, - ConditionalEdgePath, - PropertyMatch, ) from pydantic import BaseModel, ConfigDict, Field @@ -28,9 +26,6 @@ class ApplicationProperties(OktaNodeProperties): last_updated: datetime | None = None sign_on_mode: str | None = None orn: str | None = None - collected: bool = field( - default=True, metadata={"description": "Collected/generated by OpenHound"} - ) class JWK(BaseModel): @@ -121,6 +116,7 @@ def as_node(self): kinds=[nk.APPLICATION], properties=ApplicationProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.name, displayname=self.label or self.name, @@ -136,7 +132,8 @@ def as_node(self): @property def _outbound_jamf_sso_edge(self): - # TODO: Should this be conditional on sign_on_mode == SAML_2_0? + # SAML == SAML_2_0 + # SWA == SECURE_PASSWORD_STORE, BROWSER_PLUGIN or AUTO_LOGIN if self.name == "jamfsoftwareserver": jamf_domain = self.settings.app.get("domain") if jamf_domain: @@ -148,38 +145,31 @@ def _outbound_jamf_sso_edge(self): properties=EdgeProperties(traversable=True), ) - # @property - # def _outbound_jamf_swa_edge(self): - # # TODO: Should this be conditional on sign_on_mode SECURE_PASSWORD_STORE, BROWSER_PLUGIN or AUTO_LOGIN? - # if self.name == "jamfsoftwareserver": - # jamf_domain = self.settings.app.get("domain") - # if jamf_domain: - # jamf_domain = jamf_domain.replace('"', "") - # yield Edge( - # kind=ek.OUTBOUND_ORG_SSO, - # start=EdgePath(value=self.id, match_by="id"), - # end=EdgePath(value=f"{jamf_domain}-SSO", match_by="id"), - # properties=EdgeProperties(traversable=True), - # ) - # @property # def _outbount_github_sso_edge(self): - # TODO: Github matching based on domain only may cause conflicts, should we actually do this before conditional matching? + # TODO: Wait for the Github Enterprise (v.s. org) implementation is finalized # if self.name == "githubcloud": + # yield Edge( + # kind=ek.OUTBOUND_ORG_SSO, + # start=EdgePath(value=self.id, match_by="id"), + # .... + # properties=EdgeProperties(traversable=True), + # ) # - @property - def _kerberos_sso_edge(self): - # TODO: Check logic - if self.name == "active_directory": - domain = self.label.split(".")[-2] - end_spn = f"HTTP/{domain}.kerberos.okta.com" - condition = PropertyMatch(key="serviceprincipalnames", value=end_spn) - yield Edge( - kind=ek.KERBEROS_SSO, - start=ConditionalEdgePath(kind="User", property_matchers=[condition]), - end=EdgePath(value=self.id, match_by="id"), - ) + # @property + # def _kerberos_sso_edge(self): + # # TODO: matching against arrays needs to be supported by the BH API before this will + # # match with nodes + # if self.name == "active_directory": + # domain = self.label.split(".")[-2] + # end_spn = f"HTTP/{domain}.kerberos.okta.com" + # condition = PropertyMatch(key="serviceprincipalnames", value=end_spn) + # yield Edge( + # kind=ek.KERBEROS_SSO, + # start=ConditionalEdgePath(kind="User", property_matchers=[condition]), + # end=EdgePath(value=self.id, match_by="id"), + # ) @property def _contains_edge(self): @@ -192,6 +182,7 @@ def _contains_edge(self): @property def edges(self): + # Disabled until BHE supports array-based matching + # yield from self._kerberos_sso_edge yield from self._contains_edge yield from self._outbound_jamf_sso_edge - yield from self._kerberos_sso_edge diff --git a/src/openhound_okta/models/application_jwks.py b/src/openhound_okta/models/application_jwks.py index 69f41dc..9592946 100644 --- a/src/openhound_okta/models/application_jwks.py +++ b/src/openhound_okta/models/application_jwks.py @@ -74,6 +74,7 @@ def as_node(self): kinds=[nk.JWK], properties=JWKProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], name=self.display_name, displayname=self.display_name, id=self.id, diff --git a/src/openhound_okta/models/application_secrets.py b/src/openhound_okta/models/application_secrets.py index d468d44..80d7093 100644 --- a/src/openhound_okta/models/application_secrets.py +++ b/src/openhound_okta/models/application_secrets.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from dataclasses import field from datetime import datetime from openhound.core.asset import BaseAsset, EdgeDef, NodeDef @@ -9,8 +10,6 @@ from openhound_okta.kinds import edges as ek, nodes as nk from openhound_okta.main import app -from dataclasses import field - @dataclass class SecretProperties(OktaNodeProperties): @@ -68,6 +67,7 @@ def as_node(self): kinds=[nk.CLIENT_SECRET], properties=SecretProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], name=self.secret_hash, displayname=self.secret_hash, id=self.id, diff --git a/src/openhound_okta/models/application_users.py b/src/openhound_okta/models/application_users.py index ce0773d..91ea07b 100644 --- a/src/openhound_okta/models/application_users.py +++ b/src/openhound_okta/models/application_users.py @@ -7,6 +7,16 @@ from openhound_okta.kinds import edges as ek, nodes as nk from openhound_okta.main import app +# To ignore system apps optionally +SYSTEM_APPS = [ + "okta_oin_submission_tester_app", # Okta OIN Submission Tester + "okta_access_requests_resource_catalog", # Okta Identity Governance + "okta_enduser", # Okta Dashboard + "okta_browser_plugin", # Okta Browser Plugin + "active_directory", # Active Directory, for which there are sync edges + "ldap_interface" # LDAP Interface, similar to AD +] + class Provider(BaseModel): name: str @@ -81,7 +91,6 @@ class Profile(BaseModel): traversable=False, description="User is synced from an Active Directory user", ), - # TODO: Creat new edge kind for AD sync EdgeDef( kind=ek.PASSWORD_SYNC, start=nk.AD_USER, @@ -140,6 +149,8 @@ def _read_password_updates_edge(self): @property def _app_assignment_edge(self): + # Note: Check if we want to filter default assignments (factual to the environment) + # or filter out default assignments (to clean up the graph) if self.scope == "USER": yield Edge( kind=ek.APP_ASSIGNMENT, @@ -150,7 +161,6 @@ def _app_assignment_edge(self): @property def _user_push_poll_edges(self): - # TODO: Verify this logic if self.sync_state == "SYNCHRONIZED": if self.scope == "USER": yield Edge( @@ -169,11 +179,8 @@ def _user_push_poll_edges(self): @property def _password_sync_edge(self): - if self.sync_state == "SYNCHRONIZED" and self.app_name == "active_directory": - if self.scope == "USER" and self.profile.object_sid: - # app_domain = self.app_settings.get("domain") - # ad_domain = app_domain if app_domain else self.app_label - # TODO: Determine the matching between okta user and AD user + if self.sync_state == "SYNCHRONIZED" and self.app_name == "active_directory" and self.profile.object_sid: + if self.scope == "USER": yield Edge( kind=ek.USER_SYNC, start=EdgePath(value=self.profile.object_sid, match_by="id"), @@ -181,7 +188,6 @@ def _password_sync_edge(self): properties=EdgeProperties(traversable=False), ) - # TODO: Ditto, verify matching between okta user and AD user if "OUTBOUND_DEL_AUTH" in self.app_features: yield Edge( kind=ek.PASSWORD_SYNC, @@ -190,16 +196,12 @@ def _password_sync_edge(self): properties=EdgeProperties(traversable=True), ) else: - # Outboundsync, ie. OktaUser--Okta_UserSync->AD???? - # TODO: Determine the matching between okta user and AD user - # is this really based on a group/non-user? yield Edge( kind=ek.USER_SYNC, start=EdgePath(value=self.id, match_by="id"), end=EdgePath(value=self.profile.object_sid, match_by="id"), properties=EdgeProperties(traversable=False), ) - # TODO: Ditto, verify matching between okta user and AD user if "PUSH_PASSWORD_UPDATES" in self.app_features: yield Edge( kind=ek.PASSWORD_SYNC, @@ -210,7 +212,6 @@ def _password_sync_edge(self): @property def _okta_org2org_edges(self): - # TODO: Verify and check SCIM if self.app_name == "okta_org2org": if self.scope == "USER": yield Edge( diff --git a/src/openhound_okta/models/auth_server.py b/src/openhound_okta/models/auth_server.py index 381ce47..64561e0 100644 --- a/src/openhound_okta/models/auth_server.py +++ b/src/openhound_okta/models/auth_server.py @@ -63,6 +63,7 @@ def as_node(self): kinds=[nk.AUTH_SERVER], properties=AuthServerProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.name, displayname=self.name, diff --git a/src/openhound_okta/models/built_in_role.py b/src/openhound_okta/models/built_in_role.py index 00009e3..cdc6915 100644 --- a/src/openhound_okta/models/built_in_role.py +++ b/src/openhound_okta/models/built_in_role.py @@ -66,6 +66,7 @@ def as_node(self): kinds=[nk.ROLE], properties=BuiltInRoleProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.type, name=self.type, displayname=self.type, diff --git a/src/openhound_okta/models/client_role_assignment.py b/src/openhound_okta/models/client_role_assignment.py index d3d40f9..c8c626c 100644 --- a/src/openhound_okta/models/client_role_assignment.py +++ b/src/openhound_okta/models/client_role_assignment.py @@ -1,14 +1,15 @@ from dataclasses import dataclass from datetime import datetime -from openhound.core.asset import BaseAsset, EdgeDef, NodeDef +from openhound.core.asset import EdgeDef, NodeDef from openhound.core.models.entries_dataclass import Edge, EdgePath, EdgeProperties +from pydantic import BaseModel from pydantic import ConfigDict, Field from openhound_okta.graph import OktaNode, OktaNodeProperties from openhound_okta.kinds import edges as ek, nodes as nk from openhound_okta.main import app -from pydantic import BaseModel +from openhound_okta.models.role_assignment import RoleAssignment @dataclass @@ -44,6 +45,17 @@ class Target(BaseModel): groups: list[Group] | None = None +class HREF(BaseModel): + href: str + + +class Links(BaseModel): + source: HREF | None = None + users: HREF | None = None + apps: HREF | None = None + groups: HREF | None = None + + class Embedded(BaseModel): targets: Target | None = None @@ -92,6 +104,13 @@ class Embedded(BaseModel): description="Application has app admin role", traversable=True, ), + EdgeDef( + start=nk.APPLICATION, + end=nk.CLIENT_SECRET, + kind=ek.READ_CLIENT_SECRET, + description="Application can read application client secrets", + traversable=True, + ), EdgeDef( start=nk.APPLICATION, end=nk.GROUP, @@ -193,25 +212,11 @@ class Embedded(BaseModel): ), ], ) -class ClientRoleAssignment(BaseAsset): +class ClientRoleAssignment(RoleAssignment): model_config = ConfigDict(populate_by_name=True) - id: str - from_resource: str - source_id: str - - assignment_type: str = Field(alias="assignmentType") - resource_set: str | None = Field(alias="resource-set", default=None) - status: str - created: datetime | None - name: str | None = None - label: str - last_updated: datetime | None = Field(alias="lastUpdated", default=None) - features: list[str] = Field(default_factory=list) - type: str - role: str | None = None - embedded: Embedded | None = Field(alias="_embedded", default=None) + links: Links | None = Field(alias="_links", default=None) @property def as_node(self): @@ -219,6 +224,7 @@ def as_node(self): kinds=[nk.ROLE_ASSIGNMENT], properties=ClientRoleAssignmentProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.label, displayname=self.label, @@ -231,148 +237,6 @@ def as_node(self): ), ) - @property - def _has_role_assignment_edges(self): - yield Edge( - kind=ek.HAS_ROLE_ASSIGNMENT, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=self.id, match_by="id"), - properties=EdgeProperties(traversable=False), - ) - - @property - def _has_role_edges(self): - if self.type != "CUSTOM": - yield Edge( - kind=ek.HAS_ROLE, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=self.type, match_by="id"), - properties=EdgeProperties(traversable=False), - ) - else: - yield Edge( - kind=ek.HAS_ROLE, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=self.role, match_by="id"), - properties=EdgeProperties(traversable=False), - ) - - @property - def _add_member_edges(self): - if self.type == "CUSTOM" and self.role: - has_group_members_permission = self._lookup.has_role_permission( - self.role, "okta.groups.members.manage" - ) - has_group_manage_permissions = self._lookup.has_role_permission( - self.role, "okta.groups.manage" - ) - if has_group_manage_permissions or has_group_members_permission: - for (group_id,) in self._lookup.all_groups(): - yield Edge( - kind=ek.ADD_MEMBER, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=group_id, match_by="id"), - ) - # elif self.type == "SUPER_ADMIN" or ( - # BUILT_IN_PERMISSIONS.get(self.type) - # and ( - # "okta.groups.members.manage" in BUILT_IN_PERMISSIONS[self.type] - # or "okta.groups.manage" in BUILT_IN_PERMISSIONS[self.type] - # ) - # ): - # for (group_id,) in self._lookup.all_groups(): - # yield Edge( - # kind=ek.ADD_MEMBER, - # start=EdgePath(value=self.source_id, match_by="id"), - # end=EdgePath(value=group_id, match_by="id"), - # ) - - @property - def _scoped_to_org_edge(self): - org_wide_roles = [ - "SUPER_ADMIN", - "ORG_ADMIN", - "MOBILE_ADMIN", - "READ_ONLY_ADMIN", - "REPORT_ADMIN", - ] - if self.type != "CUSTOM" and self.type in org_wide_roles: - yield Edge( - kind=ek.SCOPED_TO, - start=EdgePath(value=self.id, match_by="id"), - end=EdgePath(value=self._lookup.org_id(), match_by="id"), - properties=EdgeProperties(traversable=False), - ) - - @property - def _scoped_to_group_edges(self): - if self.embedded and self.embedded.targets.groups: - for group in self.embedded.targets.groups: - yield Edge( - kind=ek.SCOPED_TO, - start=EdgePath(value=self.id, match_by="id"), - end=EdgePath(value=group.id, match_by="id"), - properties=EdgeProperties(traversable=False), - ) - - @property - def _manage_app_edges(self): - if self.type == "CUSTOM" and self.role: - has_permissions = self._lookup.has_role_permission( - self.role, "okta.groups.manage" - ) - if has_permissions: - for (app_id,) in self._lookup.all_applications(): - yield Edge( - kind=ek.MANAGE_APP, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=app_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) - - @property - def _reset_factors_edges(self): - if self.type == "CUSTOM" and self.role: - required_permissions = [ - "okta.users.credentials.resetFactors", - "okta.users.credentials.manage", - ] - has_permission = any( - self._lookup.has_role_permission(self.role, permission) - for permission in required_permissions - ) - if has_permission: - for (user_id,) in self._lookup.all_users(): - yield Edge( - kind=ek.RESET_FACTORS, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=user_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) - - @property - def _reset_password_edges(self): - if self.type == "CUSTOM" and self.role: - required_permissions = [ - "okta.users.credentials.resetPassword", - "okta.users.credentials.manage", - "okta.users.credentials.manageTemporaryAccessCode", - "okta.users.manage", - ] - has_permission = any( - self._lookup.has_role_permission(self.role, permission) - for permission in required_permissions - ) - - if has_permission: - for (user_id,) in self._lookup.all_users(): - yield Edge( - kind=ek.RESET_PASSWORD, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=user_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) - @property def _group_membership_admin_edges(self): if self.type == "GROUP_MEMBERSHIP_ADMIN": @@ -387,13 +251,30 @@ def _group_membership_admin_edges(self): @property def _app_admin_edges(self): if self.type == "APP_ADMIN": - for (app_id,) in self._lookup.all_applications(): - yield Edge( - kind=ek.APP_ADMIN, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=app_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) + if ( + self.embedded + and self.embedded.targets + and self.embedded.targets.catalog + and self.embedded.targets.catalog.apps + ): + # Emit only to scoped targets + for app in self.embedded.targets.catalog.apps: + if app.id: + yield Edge( + kind=ek.APP_ADMIN, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=app.id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) + + else: + for (app_id,) in self._lookup.all_applications(): + yield Edge( + kind=ek.APP_ADMIN, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=app_id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) @property def _helpdesk_admin_edges(self): @@ -406,17 +287,6 @@ def _helpdesk_admin_edges(self): properties=EdgeProperties(traversable=True), ) - @property - def _mobile_admin_edges(self): - if self.type == "MOBILE_ADMIN": - for (device_id,) in self._lookup.all_devices(): - yield Edge( - kind=ek.MOBILE_ADMIN, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=device_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) - @property def _org_admin_edges(self): if self.type == "ORG_ADMIN": @@ -442,15 +312,13 @@ def _org_admin_edges(self): properties=EdgeProperties(traversable=True), ) - @property - def _super_admin_edge(self): - if self.type == "SUPER_ADMIN": - yield Edge( - kind=ek.SUPER_ADMIN, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=self._lookup.org_id(), match_by="id"), - properties=EdgeProperties(traversable=True), - ) + for (group_id,) in self._lookup.all_groups(): + yield Edge( + kind=ek.ORG_ADMIN, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=group_id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) @property def _user_admin_edges(self): @@ -475,7 +343,6 @@ def _user_admin_edges(self): def edges(self): yield from self._has_role_assignment_edges yield from self._has_role_edges - yield from self._add_member_edges yield from self._app_admin_edges yield from self._group_membership_admin_edges yield from self._helpdesk_admin_edges @@ -488,3 +355,5 @@ def edges(self): yield from self._manage_app_edges yield from self._scoped_to_group_edges yield from self._scoped_to_org_edge + yield from self.read_client_secret_edges + yield from self.add_member_edges diff --git a/src/openhound_okta/models/custom_role.py b/src/openhound_okta/models/custom_role.py index bf6da8f..37d9f4b 100644 --- a/src/openhound_okta/models/custom_role.py +++ b/src/openhound_okta/models/custom_role.py @@ -57,6 +57,7 @@ def as_node(self): kinds=[nk.CUSTOM_ROLE], properties=CustomRoleProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.label, displayname=self.label, diff --git a/src/openhound_okta/models/device.py b/src/openhound_okta/models/device.py index e6875f4..10fee98 100644 --- a/src/openhound_okta/models/device.py +++ b/src/openhound_okta/models/device.py @@ -106,6 +106,7 @@ def as_node(self): kinds=[nk.DEVICE], properties=DeviceProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.profile.display_name, displayname=self.profile.display_name, diff --git a/src/openhound_okta/models/group.py b/src/openhound_okta/models/group.py index 80d789f..94bffbf 100644 --- a/src/openhound_okta/models/group.py +++ b/src/openhound_okta/models/group.py @@ -1,10 +1,12 @@ -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import datetime from typing import ClassVar +from urllib.parse import urlparse +from dlt.common import json from dlt.common.libs.pydantic import DltConfig from openhound.core.asset import BaseAsset, EdgeDef, NodeDef -from openhound.core.models.entries_dataclass import Edge, EdgePath, EdgeProperties +from openhound.core.models.entries_dataclass import Edge, EdgePath, EdgeProperties, ConditionalEdgePath, PropertyMatch from pydantic import BaseModel, ConfigDict, Field from openhound_okta.graph import OktaNode, OktaNodeProperties @@ -20,9 +22,6 @@ class GroupProperties(OktaNodeProperties): created: datetime last_updated: datetime | None = None last_membership_updated: datetime | None = None - collected: bool = field( - default=True, metadata={"description": "Collected/generated by OpenHound"} - ) class GroupProfile(BaseModel): @@ -115,6 +114,7 @@ def as_node(self): kinds=[nk.GROUP], properties=GroupProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=profile_name or self.id, displayname=profile_name or self.id, @@ -129,8 +129,8 @@ def as_node(self): @property def _membership_sync_inbound_ad_edge(self): if ( - "okta:windows_security_principal" in self.object_class - and self.profile.object_sid + "okta:windows_security_principal" in self.object_class + and self.profile.object_sid ): yield Edge( kind=ek.MEMBERSHIP_SYNC, @@ -140,11 +140,10 @@ def _membership_sync_inbound_ad_edge(self): ) @property - def _membership_sync_inbound_app_edge(self): - # TODO: Validate logic if this is just base on source id, there is no existing edge in the test tenant - if self.type == "APP_GROUP" and "okta:user_group" in self.object_class: + def _group_pull_edge(self): + if self.source and self.source.id and self._lookup.application_by_id(self.source.id): yield Edge( - kind=ek.MEMBERSHIP_SYNC, + kind=ek.GROUP_PULL, start=EdgePath(value=self.source.id, match_by="id"), end=EdgePath(value=self.id, match_by="id"), properties=EdgeProperties(traversable=True), @@ -159,8 +158,35 @@ def _contains_edges(self): properties=EdgeProperties(traversable=False), ) + @property + def _membership_sync_inbound_app_edge(self): + if self.type == "APP_GROUP" and "okta:user_group" in self.object_class: + app_settings = self._lookup.application_settings(self.source.id) + if app_settings: + app_settings_obj = json.loads(app_settings) + source_domain = urlparse(app_settings_obj["app"]["baseUrl"]).netloc + yield Edge( + kind=ek.MEMBERSHIP_SYNC, + start=ConditionalEdgePath( + kind=nk.GROUP, property_matchers=[ + PropertyMatch( + key="tenant_domain", value=source_domain + ), + PropertyMatch( + key="type", value="OKTA_GROUP" + ), + PropertyMatch( + key="name", value=self.profile.name.upper() + ) + ] + ), + end=EdgePath(value=self.id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) + @property def edges(self): + yield from self._membership_sync_inbound_app_edge yield from self._contains_edges yield from self._membership_sync_inbound_ad_edge - yield from self._membership_sync_inbound_app_edge + yield from self._group_pull_edge diff --git a/src/openhound_okta/models/group_role_assignment.py b/src/openhound_okta/models/group_role_assignment.py index cbe2f10..2ec3458 100644 --- a/src/openhound_okta/models/group_role_assignment.py +++ b/src/openhound_okta/models/group_role_assignment.py @@ -1,15 +1,15 @@ from dataclasses import dataclass from datetime import datetime -from openhound.core.asset import BaseAsset, EdgeDef, NodeDef +from openhound.core.asset import EdgeDef, NodeDef from openhound.core.models.entries_dataclass import Edge, EdgePath, EdgeProperties +from pydantic import BaseModel from pydantic import ConfigDict, Field from openhound_okta.graph import OktaNode, OktaNodeProperties from openhound_okta.kinds import edges as ek, nodes as nk from openhound_okta.main import app - -from pydantic import BaseModel +from openhound_okta.models.role_assignment import RoleAssignment @dataclass @@ -45,6 +45,17 @@ class Target(BaseModel): groups: list[Group] | None = None +class HREF(BaseModel): + href: str + + +class Links(BaseModel): + source: HREF | None = None + users: HREF | None = None + apps: HREF | None = None + groups: HREF | None = None + + class Embedded(BaseModel): targets: Target | None = None @@ -107,6 +118,13 @@ class Embedded(BaseModel): description="Group has app admin role", traversable=True, ), + EdgeDef( + start=nk.GROUP, + end=nk.CLIENT_SECRET, + kind=ek.READ_CLIENT_SECRET, + description="Group can read application client secrets", + traversable=True, + ), EdgeDef( start=nk.GROUP, end=nk.GROUP, @@ -194,24 +212,11 @@ class Embedded(BaseModel): ), ], ) -class GroupRoleAssignment(BaseAsset): +class GroupRoleAssignment(RoleAssignment): model_config = ConfigDict(populate_by_name=True) - id: str - from_resource: str - source_id: str - - assignment_type: str = Field(alias="assignmentType") - resource_set: str | None = Field(alias="resource-set", default=None) - status: str - created: datetime | None - name: str | None = None - label: str - last_updated: datetime | None = Field(alias="lastUpdated", default=None) - features: list[str] = Field(default_factory=list) - type: str - role: str | None = None embedded: Embedded | None = Field(alias="_embedded", default=None) + links: Links | None = Field(alias="_links", default=None) @property def as_node(self): @@ -219,6 +224,7 @@ def as_node(self): kinds=[nk.ROLE_ASSIGNMENT], properties=GroupRoleAssignmentProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.label, displayname=self.label, @@ -231,120 +237,6 @@ def as_node(self): ), ) - @property - def _has_role_assignment_edges(self): - yield Edge( - kind=ek.HAS_ROLE_ASSIGNMENT, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=self.id, match_by="id"), - properties=EdgeProperties(traversable=False), - ) - - @property - def _has_role_edges(self): - if self.type != "CUSTOM": - yield Edge( - kind=ek.HAS_ROLE, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=self.type, match_by="id"), - properties=EdgeProperties(traversable=False), - ) - else: - yield Edge( - kind=ek.HAS_ROLE, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=self.role, match_by="id"), - properties=EdgeProperties(traversable=False), - ) - - @property - def _add_member_edges(self): - if self.type == "CUSTOM" and self.role: - has_group_members_permission = self._lookup.has_role_permission( - self.role, "okta.groups.members.manage" - ) - has_group_manage_permissions = self._lookup.has_role_permission( - self.role, "okta.groups.manage" - ) - if has_group_manage_permissions or has_group_members_permission: - for (group_id,) in self._lookup.all_groups(): - yield Edge( - kind=ek.ADD_MEMBER, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=group_id, match_by="id"), - ) - # elif self.type == "SUPER_ADMIN" or ( - # BUILT_IN_PERMISSIONS.get(self.type) - # and ( - # "okta.groups.members.manage" in BUILT_IN_PERMISSIONS[self.type] - # or "okta.groups.manage" in BUILT_IN_PERMISSIONS[self.type] - # ) - # ): - # for (group_id,) in self._lookup.all_groups(): - # yield Edge( - # kind=ek.ADD_MEMBER, - # start=EdgePath(value=self.source_id, match_by="id"), - # end=EdgePath(value=group_id, match_by="id"), - # ) - - @property - def _manage_app_edges(self): - if self.type == "CUSTOM" and self.role: - has_permissions = self._lookup.has_role_permission( - self.role, "okta.groups.manage" - ) - if has_permissions: - for (app_id,) in self._lookup.all_applications(): - yield Edge( - kind=ek.MANAGE_APP, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=app_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) - - @property - def _reset_factors_edges(self): - if self.type == "CUSTOM" and self.role: - required_permissions = [ - "okta.users.credentials.resetFactors", - "okta.users.credentials.manage", - ] - has_permission = any( - self._lookup.has_role_permission(self.role, permission) - for permission in required_permissions - ) - if has_permission: - for (user_id,) in self._lookup.all_users(): - yield Edge( - kind=ek.RESET_FACTORS, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=user_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) - - @property - def _reset_password_edges(self): - if self.type == "CUSTOM" and self.role: - required_permissions = [ - "okta.users.credentials.resetPassword", - "okta.users.credentials.manage", - "okta.users.credentials.manageTemporaryAccessCode", - "okta.users.manage", - ] - has_permission = any( - self._lookup.has_role_permission(self.role, permission) - for permission in required_permissions - ) - - if has_permission: - for (user_id,) in self._lookup.all_users(): - yield Edge( - kind=ek.RESET_PASSWORD, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=user_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) - @property def _group_membership_admin_edges(self): if self.type == "GROUP_MEMBERSHIP_ADMIN": @@ -370,24 +262,23 @@ def _app_admin_edges(self): @property def _helpdesk_admin_edges(self): if self.type == "HELP_DESK_ADMIN": - for (user_id,) in self._lookup.all_users(): - yield Edge( - kind=ek.HELPDESK_ADMIN, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=user_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) - - @property - def _mobile_admin_edges(self): - if self.type == "MOBILE_ADMIN": - for (device_id,) in self._lookup.all_devices(): - yield Edge( - kind=ek.MOBILE_ADMIN, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=device_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) + if self.embedded and self.embedded.targets and self.embedded.targets.groups: + for group in self.embedded.targets.groups: + yield Edge( + kind=ek.HELPDESK_ADMIN, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=group.id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) + else: + # No targets specified, emit to all users + for (user_id,) in self._lookup.all_users(): + yield Edge( + kind=ek.HELPDESK_ADMIN, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=user_id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) @property def _user_admin_edges(self): @@ -430,52 +321,20 @@ def _org_admin_edges(self): kind=ek.ORG_ADMIN, start=EdgePath(value=self.source_id, match_by="id"), end=EdgePath(value=app_id, match_by="id"), - properties=EdgeProperties(traversable=True), + properties=EdgeProperties(traversable=True) ) - - @property - def _scoped_to_org_edge(self): - org_wide_roles = [ - "SUPER_ADMIN", - "ORG_ADMIN", - "MOBILE_ADMIN", - "READ_ONLY_ADMIN", - "REPORT_ADMIN", - ] - if self.type != "CUSTOM" and self.type in org_wide_roles: - yield Edge( - kind=ek.SCOPED_TO, - start=EdgePath(value=self.id, match_by="id"), - end=EdgePath(value=self._lookup.org_id(), match_by="id"), - properties=EdgeProperties(traversable=False), - ) - - @property - def _scoped_to_group_edges(self): - if self.embedded and self.embedded.targets.groups: - for group in self.embedded.targets.groups: + for (group_id,) in self._lookup.all_groups(): yield Edge( - kind=ek.SCOPED_TO, - start=EdgePath(value=self.id, match_by="id"), - end=EdgePath(value=group.id, match_by="id"), - properties=EdgeProperties(traversable=False), + kind=ek.ORG_ADMIN, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=group_id, match_by="id"), + properties=EdgeProperties(traversable=True), ) - @property - def _super_admin_edge(self): - if self.type == "SUPER_ADMIN": - yield Edge( - kind=ek.SUPER_ADMIN, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=self._lookup.org_id(), match_by="id"), - properties=EdgeProperties(traversable=True), - ) - @property def edges(self): yield from self._has_role_assignment_edges yield from self._has_role_edges - yield from self._add_member_edges yield from self._app_admin_edges yield from self._group_membership_admin_edges yield from self._helpdesk_admin_edges @@ -488,3 +347,5 @@ def edges(self): yield from self._manage_app_edges yield from self._scoped_to_group_edges yield from self._scoped_to_org_edge + yield from self.read_client_secret_edges + yield from self.add_member_edges diff --git a/src/openhound_okta/models/idp.py b/src/openhound_okta/models/idp.py index 9fc4068..ef59ff8 100644 --- a/src/openhound_okta/models/idp.py +++ b/src/openhound_okta/models/idp.py @@ -166,6 +166,7 @@ def as_node(self): kinds=[nk.IDP], properties=IdentityProviderProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.name, displayname=self.name, diff --git a/src/openhound_okta/models/organization.py b/src/openhound_okta/models/organization.py index 9fdd689..cc2ee2b 100644 --- a/src/openhound_okta/models/organization.py +++ b/src/openhound_okta/models/organization.py @@ -49,6 +49,7 @@ def as_node(self): kinds=[nk.ORG], properties=OrganizationProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.company_name or self.subdomain, displayname=self.company_name or self.subdomain, diff --git a/src/openhound_okta/models/policy.py b/src/openhound_okta/models/policy.py index a7733fc..a0f86a8 100644 --- a/src/openhound_okta/models/policy.py +++ b/src/openhound_okta/models/policy.py @@ -61,6 +61,7 @@ def as_node(self): kinds=[nk.POLICY], properties=PolicyProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.name, displayname=self.name, diff --git a/src/openhound_okta/models/realm.py b/src/openhound_okta/models/realm.py index 71cb82d..494b84a 100644 --- a/src/openhound_okta/models/realm.py +++ b/src/openhound_okta/models/realm.py @@ -61,6 +61,7 @@ def as_node(self): kinds=[nk.REALM], properties=RealmProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.profile.name, displayname=self.profile.name, diff --git a/src/openhound_okta/models/resource.py b/src/openhound_okta/models/resource.py index 4984d08..2b2cd6f 100644 --- a/src/openhound_okta/models/resource.py +++ b/src/openhound_okta/models/resource.py @@ -2,7 +2,7 @@ from openhound.core.asset import BaseAsset, EdgeDef from openhound.core.models.entries_dataclass import Edge, EdgePath, EdgeProperties -from pydantic import ConfigDict +from pydantic import ConfigDict, Field from openhound_okta.kinds import edges as ek, nodes as nk from openhound_okta.main import app @@ -72,9 +72,10 @@ class Resource(BaseAsset): model_config = ConfigDict(populate_by_name=True) - id: str + id: str | None = None orn: str created: datetime | None = None + links: dict | None = Field(default=None, alias="_links") # Additional resource_set_id: str diff --git a/src/openhound_okta/models/resource_set.py b/src/openhound_okta/models/resource_set.py index 712f316..bc19516 100644 --- a/src/openhound_okta/models/resource_set.py +++ b/src/openhound_okta/models/resource_set.py @@ -65,6 +65,7 @@ def as_node(self): description=self.description, last_updated=self.last_updated, environmentid=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], ), ) diff --git a/src/openhound_okta/models/role_assignment.py b/src/openhound_okta/models/role_assignment.py new file mode 100644 index 0000000..536b97d --- /dev/null +++ b/src/openhound_okta/models/role_assignment.py @@ -0,0 +1,246 @@ +from datetime import datetime +from typing import Any + +from openhound.core.asset import BaseAsset +from openhound.core.models.entries_dataclass import Edge, EdgePath, EdgeProperties +from pydantic import Field + +from openhound_okta.kinds import edges as ek + +DIRECT_ASSIGNMENT_TYPES = { + "user": "USER", + "group": "GROUP", + "client": "CLIENT", +} + +ADD_MEMBER_PERMISSIONS = ( + "okta.groups.manage", + "okta.groups.members.manage", +) + + +class RoleAssignment(BaseAsset): + id: str + from_resource: str + source_id: str + assignment_type: str = Field(alias="assignmentType") + resource_set: str | None = Field(alias="resource-set", default=None) + status: str + created: datetime | None + name: str | None = None + label: str + last_updated: datetime | None = Field(alias="lastUpdated", default=None) + features: list[str] = Field(default_factory=list) + type: str + role: str | None = None + embedded: Any = Field(alias="_embedded", default=None) + links: Any = Field(alias="_links", default=None) + + @property + def _has_role_assignment_edges(self): + yield Edge( + kind=ek.HAS_ROLE_ASSIGNMENT, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=self.id, match_by="id"), + properties=EdgeProperties(traversable=False), + ) + + @property + def _has_role_edges(self): + if self.type != "CUSTOM": + yield Edge( + kind=ek.HAS_ROLE, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=self.type, match_by="id"), + properties=EdgeProperties(traversable=False), + ) + else: + yield Edge( + kind=ek.HAS_ROLE, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=self.role, match_by="id"), + properties=EdgeProperties(traversable=False), + ) + + @property + def _manage_app_edges(self): + if self.type == "CUSTOM" and self.role: + has_permissions = self._lookup.has_role_permission( + self.role, "okta.groups.manage" + ) + if has_permissions: + for (app_id,) in self._lookup.all_applications(): + yield Edge( + kind=ek.MANAGE_APP, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=app_id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) + + @property + def _reset_factors_edges(self): + if self.type == "CUSTOM" and self.role: + required_permissions = [ + "okta.users.credentials.resetFactors", + "okta.users.credentials.manage", + ] + has_permission = any( + self._lookup.has_role_permission(self.role, permission) + for permission in required_permissions + ) + if has_permission: + for (user_id,) in self._lookup.all_users(): + yield Edge( + kind=ek.RESET_FACTORS, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=user_id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) + + @property + def _reset_password_edges(self): + if self.type == "CUSTOM" and self.role: + required_permissions = [ + "okta.users.credentials.resetPassword", + "okta.users.credentials.manage", + "okta.users.credentials.manageTemporaryAccessCode", + "okta.users.manage", + ] + has_permission = any( + self._lookup.has_role_permission(self.role, permission) + for permission in required_permissions + ) + + if has_permission: + for (user_id,) in self._lookup.all_users(): + yield Edge( + kind=ek.RESET_PASSWORD, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=user_id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) + + @property + def _scoped_to_org_edge(self): + org_wide_roles = [ + "SUPER_ADMIN", + "ORG_ADMIN", + "MOBILE_ADMIN", + "READ_ONLY_ADMIN", + "REPORT_ADMIN", + ] + if self.type != "CUSTOM" and self.type in org_wide_roles: + yield Edge( + kind=ek.SCOPED_TO, + start=EdgePath(value=self.id, match_by="id"), + end=EdgePath(value=self._lookup.org_id(), match_by="id"), + properties=EdgeProperties(traversable=False), + ) + + @property + def _scoped_to_group_edges(self): + if self.embedded and self.embedded.targets and self.embedded.targets.groups: + for group in self.embedded.targets.groups: + yield Edge( + kind=ek.SCOPED_TO, + start=EdgePath(value=self.id, match_by="id"), + end=EdgePath(value=group.id, match_by="id"), + properties=EdgeProperties(traversable=False), + ) + + @property + def _mobile_admin_edges(self): + if self.type == "MOBILE_ADMIN": + for (device_id,) in self._lookup.all_devices(): + yield Edge( + kind=ek.MOBILE_ADMIN, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=device_id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) + + @property + def _super_admin_edge(self): + if self.type == "SUPER_ADMIN": + yield Edge( + kind=ek.SUPER_ADMIN, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=self._lookup.org_id(), match_by="id"), + properties=EdgeProperties(traversable=True), + ) + + @property + def add_member_edges(self): + expected_assignment_type = DIRECT_ASSIGNMENT_TYPES.get(self.from_resource) + if ( + self.type != "CUSTOM" + or not self.role + or self.status != "ACTIVE" + or self.assignment_type != expected_assignment_type + or not self.resource_set + ): + return + + has_permission = any( + self._lookup.has_role_permission(self.role, permission) + for permission in ADD_MEMBER_PERMISSIONS + ) + if not has_permission: + return + + for group_id in self._lookup.resource_set_non_admin_group_ids(self.resource_set): + yield Edge( + kind=ek.ADD_MEMBER, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=group_id, match_by="id"), + ) + + @property + def read_client_secret_edges(self): + if self.type == "APP_ADMIN": + embedded = self.embedded + if ( + embedded + and embedded.targets + and embedded.targets.catalog + and embedded.targets.catalog.apps + ): + app_ids = [app.id for app in embedded.targets.catalog.apps if app.id] + else: + app_ids = [app_id for (app_id,) in self._lookup.all_applications()] + + for app_id in app_ids: + for (secret_id,) in self._lookup.application_secret_ids(app_id): + yield Edge( + kind=ek.READ_CLIENT_SECRET, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=secret_id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) + + elif self.type in ["API_ACCESS_MANAGEMENT_ADMIN", "READ_ONLY_ADMIN"]: + for (app_id,) in self._lookup.all_applications(): + for (secret_id,) in self._lookup.application_secret_ids(app_id): + yield Edge( + kind=ek.READ_CLIENT_SECRET, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=secret_id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) + + elif ( + self.type == "CUSTOM" + and self.role + and self.resource_set + and self._lookup.has_role_permission( + self.role, "okta.apps.clientCredentials.read" + ) + ): + for app_id in self._lookup.resource_set_application_ids(self.resource_set): + for (secret_id,) in self._lookup.application_secret_ids(app_id): + yield Edge( + kind=ek.READ_CLIENT_SECRET, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=secret_id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) diff --git a/src/openhound_okta/models/user.py b/src/openhound_okta/models/user.py index 8b6aefb..fcf8f3b 100644 --- a/src/openhound_okta/models/user.py +++ b/src/openhound_okta/models/user.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import datetime from typing import ClassVar @@ -40,9 +40,6 @@ class UserProperties(OktaNodeProperties): manager_id: str | None = None credential_provider_type: str | None = None credential_provider_name: str | None = None - collected: bool = field( - default=True, metadata={"description": "Collected/generated by OpenHound"} - ) class Provider(BaseModel): @@ -131,6 +128,7 @@ def as_node(self): kinds=[nk.USER], properties=UserProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.profile.login, displayname=self.profile.login @@ -189,14 +187,17 @@ def _realm_contains_edges(self): @property def _manager_of_edges(self): - if self.profile and self.profile.manager_id: + if ( + self.profile + and self.profile.manager_id + ): manager_id = self._lookup.manager_id(self.profile.manager_id) if manager_id: yield Edge( kind=ek.MANAGER_OF, start=EdgePath(value=manager_id, match_by="id"), end=EdgePath(value=self.id, match_by="id"), - properties=EdgeProperties(traversable=False), + properties=EdgeProperties(traversable=True), ) @property diff --git a/src/openhound_okta/models/user_role_assignment.py b/src/openhound_okta/models/user_role_assignment.py index 8362bc9..e8d0767 100644 --- a/src/openhound_okta/models/user_role_assignment.py +++ b/src/openhound_okta/models/user_role_assignment.py @@ -1,14 +1,15 @@ from dataclasses import dataclass from datetime import datetime -from openhound.core.asset import BaseAsset, EdgeDef, NodeDef +from openhound.core.asset import EdgeDef, NodeDef from openhound.core.models.entries_dataclass import Edge, EdgePath, EdgeProperties +from pydantic import BaseModel from pydantic import ConfigDict, Field from openhound_okta.graph import OktaNode, OktaNodeProperties from openhound_okta.kinds import edges as ek, nodes as nk from openhound_okta.main import app -from pydantic import BaseModel +from openhound_okta.models.role_assignment import RoleAssignment @dataclass @@ -44,6 +45,17 @@ class Target(BaseModel): groups: list[Group] | None = None +class HREF(BaseModel): + href: str + + +class Links(BaseModel): + source: HREF | None = None + users: HREF | None = None + apps: HREF | None = None + groups: HREF | None = None + + class Embedded(BaseModel): targets: Target | None = None @@ -184,6 +196,13 @@ class Embedded(BaseModel): description="Application has app admin role", traversable=True, ), + EdgeDef( + start=nk.USER, + end=nk.CLIENT_SECRET, + kind=ek.READ_CLIENT_SECRET, + description="User can read application client secrets", + traversable=True, + ), # Group Membership Role EdgeDef( start=nk.GROUP, @@ -376,31 +395,11 @@ class Embedded(BaseModel): ), ], ) -class UserRoleAssignment(BaseAsset): +class UserRoleAssignment(RoleAssignment): model_config = ConfigDict(populate_by_name=True) - # Base response via when listing assignments - id: str - - # Additional - from_resource: str - source_id: str - - # Details when fetching user/group - assignment_type: str = Field(alias="assignmentType") - resource_set: str | None = Field(alias="resource-set", default=None) - status: str - created: datetime | None - name: str | None = None - label: str - status: str - last_updated: datetime | None = Field(alias="lastUpdated", default=None) - created: datetime | None - features: list[str] = Field(default_factory=list) - type: str - role: str | None = None - embedded: Embedded | None = Field(alias="_embedded", default=None) + links: Links | None = Field(alias="_links", default=None) @property def as_node(self): @@ -408,6 +407,7 @@ def as_node(self): kinds=[nk.ROLE_ASSIGNMENT], properties=UserRoleAssignmentProperties( tenant=self._lookup.org_id(), + tenant_domain=self._extras["tenant"], id=self.id, name=self.label, displayname=self.label, @@ -420,148 +420,13 @@ def as_node(self): ), ) - @property - def _has_role_assignment_edges(self): - yield Edge( - kind=ek.HAS_ROLE_ASSIGNMENT, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=self.id, match_by="id"), - properties=EdgeProperties(traversable=False), - ) - - @property - def _has_role_edges(self): - if self.type != "CUSTOM": - yield Edge( - kind=ek.HAS_ROLE, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=self.type, match_by="id"), - properties=EdgeProperties(traversable=False), - ) - else: - yield Edge( - kind=ek.HAS_ROLE, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=self.role, match_by="id"), - properties=EdgeProperties(traversable=False), - ) - - @property - def _manage_app_edges(self): - if self.type == "CUSTOM" and self.role: - has_permissions = self._lookup.has_role_permission( - self.role, "okta.groups.manage" - ) - if has_permissions: - for (app_id,) in self._lookup.all_applications(): - yield Edge( - kind=ek.MANAGE_APP, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=app_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) - - @property - def _reset_factors_edges(self): - if self.type == "CUSTOM" and self.role: - required_permissions = [ - "okta.users.credentials.resetFactors", - "okta.users.credentials.manage", - ] - has_permission = any( - self._lookup.has_role_permission(self.role, permission) - for permission in required_permissions - ) - if has_permission: - for (user_id,) in self._lookup.all_users(): - yield Edge( - kind=ek.RESET_FACTORS, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=user_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) - - @property - def _reset_password_edges(self): - if self.type == "CUSTOM" and self.role: - required_permissions = [ - "okta.users.credentials.resetPassword", - "okta.users.credentials.manage", - "okta.users.credentials.manageTemporaryAccessCode", - "okta.users.manage", - ] - has_permission = any( - self._lookup.has_role_permission(self.role, permission) - for permission in required_permissions - ) - - if has_permission: - for (user_id,) in self._lookup.all_users(): - yield Edge( - kind=ek.RESET_PASSWORD, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=user_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) - - @property - def _add_member_edges(self): - # TODO: The custom role check does not work yet - if self.type == "CUSTOM" and self.role: - has_group_members_permission = self._lookup.has_role_permission( - self.role, "okta.groups.members.manage" - ) - has_group_manage_permissions = self._lookup.has_role_permission( - self.role, "okta.groups.manage" - ) - if has_group_manage_permissions or has_group_members_permission: - all_groups = self._lookup.all_groups() - for (group_id,) in all_groups: - yield Edge( - kind=ek.ADD_MEMBER, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=group_id, match_by="id"), - ) - # else: - # if self.type == "SUPER_ADMIN" or ( - # BUILT_IN_PERMISSIONS.get(self.type) - # and ( - # "okta.groups.members.manage" in BUILT_IN_PERMISSIONS[self.type] - # or ["okta.groups.manage"] in BUILT_IN_PERMISSIONS[self.type] - # ) - # ): - # all_groups = self._lookup.all_groups() - # for (group_id,) in all_groups: - # yield Edge( - # kind=ek.ADD_MEMBER, - # start=EdgePath(value=self.source_id, match_by="id"), - # end=EdgePath(value=group_id, match_by="id"), - # ) - - @property - def _scoped_to_org_edge(self): - org_wide_roles = [ - "SUPER_ADMIN", - "ORG_ADMIN", - "MOBILE_ADMIN", - "READ_ONLY_ADMIN", - "REPORT_ADMIN", - ] - if self.type != "CUSTOM" and self.type in org_wide_roles: - yield Edge( - kind=ek.SCOPED_TO, - start=EdgePath(value=self.id, match_by="id"), - end=EdgePath(value=self._lookup.org_id(), match_by="id"), - properties=EdgeProperties(traversable=False), - ) - @property def _scoped_to_app_edges(self): if ( - self.embedded - and self.embedded.targets - and self.embedded.targets.catalog - and self.embedded.targets.catalog.apps + self.embedded + and self.embedded.targets + and self.embedded.targets.catalog + and self.embedded.targets.catalog.apps ): for app in self.embedded.targets.catalog.apps: if app.id: @@ -572,17 +437,6 @@ def _scoped_to_app_edges(self): properties=EdgeProperties(traversable=False), ) - @property - def _scoped_to_group_edges(self): - if self.embedded and self.embedded.targets and self.embedded.targets.groups: - for group in self.embedded.targets.groups: - yield Edge( - kind=ek.SCOPED_TO, - start=EdgePath(value=self.id, match_by="id"), - end=EdgePath(value=group.id, match_by="id"), - properties=EdgeProperties(traversable=False), - ) - @property def _group_membership_admin_edges(self): """ @@ -615,18 +469,16 @@ def _group_membership_admin_edges(self): @property def _app_admin_edges(self): """ - APP_ADMIN permission edges: (:Assignee)-[:Okta_AppAdmin]->(:Application|:ApiServiceIntegration) - If role has specific app targets, emit edges only to those apps. - If no targets, emit to all apps in the organization. - Apps/ApiServiceIntegrations with role assignments cannot be managed by APP_ADMIN. + APP_ADMIN permission edges: (:Assignee)-[:Okta_AppAdmin]->(:Application) + Emit edges to all apps in the organization. """ if self.type == "APP_ADMIN": # Get targets from embedded data, or all apps if no targets if ( - self.embedded - and self.embedded.targets - and self.embedded.targets.catalog - and self.embedded.targets.catalog.apps + self.embedded + and self.embedded.targets + and self.embedded.targets.catalog + and self.embedded.targets.catalog.apps ): # Emit only to scoped targets for app in self.embedded.targets.catalog.apps: @@ -657,12 +509,13 @@ def _helpdesk_admin_edges(self): """ if self.type == "HELP_DESK_ADMIN": if self.embedded and self.embedded.targets and self.embedded.targets.groups: - # Emit to users in scoped target groups - # TODO: Resolve group members from embedded target group IDs for group in self.embedded.targets.groups: - # For now, emit edge to the group itself as a placeholder - # Full implementation would need to resolve all users in the group - pass + yield Edge( + kind=ek.HELPDESK_ADMIN, + start=EdgePath(value=self.source_id, match_by="id"), + end=EdgePath(value=group.id, match_by="id"), + properties=EdgeProperties(traversable=True), + ) else: # No targets specified, emit to all users all_users = self._lookup.all_users() @@ -674,18 +527,6 @@ def _helpdesk_admin_edges(self): properties=EdgeProperties(traversable=True), ) - @property - def _mobile_admin_edges(self): - if self.type == "MOBILE_ADMIN": - all_devices = self._lookup.all_devices() - for (device_id,) in all_devices: - yield Edge( - kind=ek.MOBILE_ADMIN, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=device_id, match_by="id"), - properties=EdgeProperties(traversable=True), - ) - @property def _org_admin_edges(self): """ @@ -697,19 +538,19 @@ def _org_admin_edges(self): """ if self.type == "ORG_ADMIN": has_targets = ( - self.embedded - and self.embedded.targets - and ( - ( - self.embedded.targets.groups - and len(self.embedded.targets.groups) > 0 - ) - or ( - self.embedded.targets.catalog - and self.embedded.targets.catalog.apps - and len(self.embedded.targets.catalog.apps) > 0 + self.embedded + and self.embedded.targets + and ( + ( + self.embedded.targets.groups + and len(self.embedded.targets.groups) > 0 + ) + or ( + self.embedded.targets.catalog + and self.embedded.targets.catalog.apps + and len(self.embedded.targets.catalog.apps) > 0 + ) ) - ) ) if has_targets: @@ -770,16 +611,6 @@ def _org_admin_edges(self): properties=EdgeProperties(traversable=True), ) - @property - def _super_admin_edge(self): - if self.type == "SUPER_ADMIN": - yield Edge( - kind=ek.SUPER_ADMIN, - start=EdgePath(value=self.source_id, match_by="id"), - end=EdgePath(value=self._lookup.org_id(), match_by="id"), - properties=EdgeProperties(traversable=True), - ) - @property def _user_admin_edges(self): """ @@ -820,7 +651,6 @@ def _user_admin_edges(self): def edges(self): yield from self._has_role_assignment_edges yield from self._has_role_edges - yield from self._add_member_edges yield from self._app_admin_edges yield from self._group_membership_admin_edges yield from self._helpdesk_admin_edges @@ -834,3 +664,5 @@ def edges(self): yield from self._scoped_to_app_edges yield from self._scoped_to_group_edges yield from self._scoped_to_org_edge + yield from self.read_client_secret_edges + yield from self.add_member_edges diff --git a/src/openhound_okta/source.py b/src/openhound_okta/source.py index ec967a8..1afb4b2 100644 --- a/src/openhound_okta/source.py +++ b/src/openhound_okta/source.py @@ -291,11 +291,6 @@ def applications(ctx: SourceContext): @app.transformer(name="application_jwks", columns=ApplicationJWKS, parallelized=True) def application_jwks(application: Application, ctx: SourceContext): - # TODO: This is a dedicated API endpoint to get the JWKs, not sure if the embedded keys have a max/limit - # for page in ctx.pool.paginate(f"/api/v1/apps/{application.id}/credentials/jwks"): - # yield page - # for key in application.jwk: - oauth_client = application.settings.oauth_client if oauth_client and oauth_client.jwks: for key in oauth_client.jwks.keys: @@ -314,7 +309,7 @@ def application_jwks(application: Application, ctx: SourceContext): def application_group_push_mappings(application: Application, ctx: SourceContext): if "GROUP_PUSH" in application.features: for page in ctx.pool.paginate( - f"/api/v1/apps/{application.id}/group-push/mappings" + f"/api/v1/apps/{application.id}/group-push/mappings" ): for item in page: yield {"app_id": application.id, "app_name": application.name, **item} @@ -326,11 +321,11 @@ def application_group_push_mappings(application: Application, ctx: SourceContext def application_secrets(application: Application, ctx: SourceContext): oauth_client = application.credentials.oauth_client if ( - oauth_client - and oauth_client.token_endpoint_auth_method == "client_secret_basic" + oauth_client + and oauth_client.token_endpoint_auth_method == "client_secret_basic" ): for page in ctx.pool.paginate( - f"/api/v1/apps/{application.id}/credentials/secrets" + f"/api/v1/apps/{application.id}/credentials/secrets" ): for item in page: yield {"app_id": application.id, "app_name": application.name, **item} @@ -372,7 +367,7 @@ def client_applications(ctx: SourceContext): def client_role_assignments(client: ClientApplication, ctx: SourceContext): if client.application_type == "service": for page in ctx.pool.paginate( - f"/oauth2/v1/clients/{client.client_id}/roles?expand=targets/catalog/apps&expand=targets/groups" + f"/oauth2/v1/clients/{client.client_id}/roles?expand=targets/catalog/apps&expand=targets/groups" ): for item in page: yield {"from_resource": "client", "source_id": client.client_id, **item} @@ -414,7 +409,7 @@ def _assignee_details(user_id: str): def group_role_assignments(group: Group, ctx: SourceContext): if group.embedded.stats.has_admin_privilege: for page in ctx.pool.paginate( - f"/api/v1/groups/{group.id}/roles?expand=targets/catalog/apps&expand=targets/groups" + f"/api/v1/groups/{group.id}/roles?expand=targets/catalog/apps&expand=targets/groups" ): for role in page: yield {"from_resource": "group", "source_id": group.id, **role} @@ -602,7 +597,7 @@ def resource_sets(ctx: SourceContext): @app.transformer(name="resources", columns=Resource, parallelized=True) def resources(resource_set: ResourceSet, ctx: SourceContext): for page in ctx.pool.paginate( - f"api/v1/iam/resource-sets/{resource_set.id}/resources" + f"api/v1/iam/resource-sets/{resource_set.id}/resources" ): for item in page: yield {"resource_set_id": resource_set.id, **item} @@ -633,9 +628,9 @@ def api_services(ctx: SourceContext): @app.source(name="okta", max_table_nesting=0) def source( - credentials: Union[ - OktaAppCredentials, OktaEncodedAppCredentials, OktaTokenCredentials - ] = dlt.secrets.value, + credentials: Union[ + OktaAppCredentials, OktaEncodedAppCredentials, OktaTokenCredentials + ] = dlt.secrets.value, ) -> tuple: """DLT source, defines Okta collection resources and transformers. diff --git a/src/openhound_okta/utils/auth.py b/src/openhound_okta/utils/auth.py index da133cd..106085e 100644 --- a/src/openhound_okta/utils/auth.py +++ b/src/openhound_okta/utils/auth.py @@ -29,35 +29,11 @@ def generate_private_key(self) -> None: def private_key(self) -> dict: """Load the private key from the specified path.""" - # TODO: Generate a key if the path does not exist. Move this to a CLI-only option - # if not self.private_key_path.exists(): - # logger.warning( - # f"Private key not found. Generating a new private key at {self.private_key_path}." - # ) - # self.generate_private_key() - # public_key = self.public_key(self.private_key_path.read_text()).strip() - # rprint( - # "[bold red]Public key generated.[/bold red] Please upload the following public key to Okta:" - # ) - # print(public_key) - # sys.exit(1) - logger.info(f"Loaded private key from {self.private_key_path}") return json.loads(self.private_key_path.read_text()) - # def public_key(self, private_key_content: str) -> str: - # """Returns public key based on private key path - # - # Returns: - # str: The public key in PEM format. - # """ - # private_key_import = RSAKey.import_key(private_key_content) - # public_key_jwk = private_key_import.as_dict(private=False) - # public_key_jwk["kid"] = private_key_import.thumbprint() - # return json.dumps(public_key_jwk) - def jwt( - self, private_key: dict, client_id: str, audience: str, exp_delta: int = 60 + self, private_key: dict, client_id: str, audience: str, exp_delta: int = 60 ) -> str: """Returns a JWT token for Okta authentication