From 458da621f7523916d8987f514b4b9216f1dd3648 Mon Sep 17 00:00:00 2001 From: John Walstra Date: Fri, 20 Mar 2026 16:29:09 -0500 Subject: [PATCH] Update keeper_dag and dicovery_common modules. --- .../discovery_common/__version__.py | 2 +- .../discovery_common/infrastructure.py | 93 +- keepercommander/discovery_common/jobs.py | 22 +- keepercommander/discovery_common/process.py | 5 +- keepercommander/discovery_common/rm_types.py | 39 +- keepercommander/discovery_common/types.py | 108 +- .../discovery_common/user_service.py | 968 +++++++++++------- keepercommander/discovery_common/utils.py | 60 +- keepercommander/keeper_dag/__version__.py | 2 +- .../keeper_dag/connection/local.py | 3 + keepercommander/keeper_dag/dag.py | 10 +- keepercommander/keeper_dag/edge.py | 10 +- keepercommander/keeper_dag/vertex.py | 14 +- 13 files changed, 869 insertions(+), 467 deletions(-) diff --git a/keepercommander/discovery_common/__version__.py b/keepercommander/discovery_common/__version__.py index bc50bee68..1bdaf4702 100644 --- a/keepercommander/discovery_common/__version__.py +++ b/keepercommander/discovery_common/__version__.py @@ -1 +1 @@ -__version__ = '1.1.4' +__version__ = '1.1.10' diff --git a/keepercommander/discovery_common/infrastructure.py b/keepercommander/discovery_common/infrastructure.py index 61147bf33..c396baec8 100644 --- a/keepercommander/discovery_common/infrastructure.py +++ b/keepercommander/discovery_common/infrastructure.py @@ -4,11 +4,12 @@ from ..keeper_dag import DAG, EdgeType from ..keeper_dag.exceptions import DAGVertexException from ..keeper_dag.crypto import urlsafe_str_to_bytes -from ..keeper_dag.types import PamGraphId, PamEndpoints +from ..keeper_dag.types import PamGraphId +from discovery_common.types import DiscoveryObject import os import importlib import time -from typing import Any, Optional, TYPE_CHECKING +from typing import Any, Optional, Dict, List, TYPE_CHECKING if TYPE_CHECKING: from ..keeper_dag.vertex import DAGVertex @@ -59,6 +60,8 @@ def __init__(self, record: Any, logger: Optional[Any] = None, history_level: int self.conn = get_connection(logger=logger, **kwargs) + self._cache: Optional[Dict] = None + @property def dag(self) -> DAG: if self._dag is None: @@ -123,6 +126,12 @@ def close(self): Clean up resources held by this Infrastructure instance. Releases the DAG instance and connection to prevent memory leaks. """ + if self._cache: + for v in self._cache.values(): + v["vertex"] = None + v["content"] = None + self._cache.clear() + if self._dag is not None: self._dag = None self.conn = None @@ -150,6 +159,86 @@ def save(self, delta_graph: Optional[bool] = None): self._dag.save(delta_graph=delta_graph) self.logger.debug(f"infrastructure took {time.time()-ts} secs to save") + def cache_objects(self): + + self.logger.debug(f"building id to infrastructure cache") + + self._cache = {} + + def _cache(v: DAGVertex, parent_content: Optional[DiscoveryObject] = None): + c = DiscoveryObject.get_discovery_object(v) + key = c.object_type_value.lower() + c.id.lower() + self._cache[key] = { + "key": key, + "uid": v.uid, + "parent_uid": parent_content.uid if parent_content else None, + "vertex": v, + "content": c, + "was_found": False, + "could_login": True, + "is_new": False, + "md5": c.md5 + } + + for next_v in v.has_vertices(): + _cache(next_v, c) + + if self.has_discovery_data: + ts = time.time() + _cache(self.get_configuration, None) + self.logger.info(f" infrastructure cache build time: {time.time()-ts} seconds") + else: + self.logger.info(f" no infrastructure data to cache") + + def get_cache_info(self, object_type_value: str, object_id: str) -> Dict: + return self._cache.get(object_type_value.lower() + object_id.lower()) + + def get_cache_info_by_key(self, key: str) -> Dict: + return self._cache.get(key.lower()) + + def get_missing_cache_list(self, uid: Optional[str] = None) -> List[str]: + not_found_list = [] + for k, v in self._cache.items(): + if not v["is_new"] and not v["was_found"]: + if uid is None or uid == v["uid"] or uid == v["parent_uid"]: + not_found_list.append(k) + return not_found_list + + def add_info_to_cache(self, vertex: DAGVertex, content: DiscoveryObject, parent_vertex: Optional[DAGVertex] = None): + if self._cache is None: + self._cache = {} + + key = content.object_type_value.lower() + content.id.lower() + self._cache[key] = { + "key": key, + "uid": vertex.uid, + "parent_uid": parent_vertex.uid if parent_vertex else None, + "vertex": vertex, + "content": content, + "was_found": True, + "could_login": True, + "is_new": True, + "md5": content.md5 + } + + def update_cache_info(self, info: Dict): + key = info["key"] + self._cache[key] = info + + def find_content(self, query: Dict, ignore_case: bool = False) -> Optional[DAGVertex]: + """ + Find the vertex that matches the query. + + Will only find one. + If it does not match, return None + If matches on more, return None + """ + + vertices = self.dag.search_content(query=query, ignore_case=ignore_case) + if len(vertices) != 1: + return None + return vertices[0] + def to_dot(self, graph_format: str = "svg", show_hex_uid: bool = False, show_version: bool = True, show_only_active_vertices: bool = False, show_only_active_edges: bool = False, sync_point: int = None, graph_type: str = "dot"): diff --git a/keepercommander/discovery_common/jobs.py b/keepercommander/discovery_common/jobs.py index ff7198f06..098179da0 100644 --- a/keepercommander/discovery_common/jobs.py +++ b/keepercommander/discovery_common/jobs.py @@ -2,7 +2,7 @@ from .utils import get_connection, make_agent from .types import JobContent, JobItem, Settings, DiscoveryDelta from ..keeper_dag import DAG, EdgeType -from ..keeper_dag.types import PamGraphId, PamEndpoints +from ..keeper_dag.types import PamGraphId import logging import os import base64 @@ -320,26 +320,12 @@ def get_job(self, job_id) -> Optional[JobItem]: # Get the job item from the job vertex DATA edge. # Replace the one from the job history if we have it. try: - job = job_vertex.content_as_object(JobItem) + found_job = job_vertex.content_as_object(JobItem) + if found_job is not None: + job = found_job except Exception as err: self.logger.debug(f"could not find job item on job vertex, use job histry entry: {err}") - # If the job delta is None, check to see if it chunked as vertices. - delta_lookup = {} - vertices = job_vertex.has_vertices() - self.logger.debug(f"found {len(vertices)} delta vertices") - for vertex in vertices: - edge = vertex.get_edge(job_vertex, edge_type=EdgeType.KEY) - delta_lookup[int(edge.path)] = vertex - - json_value = "" - # Sort numerically increasing and then append their content. - # This will re-assemble the JSON - for key in sorted(delta_lookup): - json_value += delta_lookup[key].content_as_str - if json_value != "": - self.logger.debug(f"delta content length is {len(json_value)}") - job.delta = DiscoveryDelta.model_validate_json(json_value) else: self.logger.debug("could not find job vertex") diff --git a/keepercommander/discovery_common/process.py b/keepercommander/discovery_common/process.py index 2e835112a..1a3eaf8f6 100644 --- a/keepercommander/discovery_common/process.py +++ b/keepercommander/discovery_common/process.py @@ -1019,7 +1019,7 @@ def _process_level(self, if admin_uid is not None: self.logger.debug(" found directory user admin, connect to resource") - # self.record_link.belongs_to(admin_uid, add_content.record_uid, acl=acl) + self.record_link.belongs_to(admin_uid, add_content.record_uid, acl=acl) should_prompt_for_admin = False else: self.logger.debug(" did not find the directory user for the admin, " @@ -1562,7 +1562,4 @@ def run(self, self.infra.save(delta_graph=False) self.logger.debug("# ####################################################################################") - # Update the user service mapping - self.user_service.run(infra=self.infra) - return bulk_process_results diff --git a/keepercommander/discovery_common/rm_types.py b/keepercommander/discovery_common/rm_types.py index 3f6d00b3c..38057ba18 100644 --- a/keepercommander/discovery_common/rm_types.py +++ b/keepercommander/discovery_common/rm_types.py @@ -137,6 +137,30 @@ class RmAzureGroupAddMeta(RmMetaBase): group_types: List[str] = [] +class RmGcpUserAddMeta(RmMetaBase): + account_enabled: Optional[bool] = True + display_name: Optional[str] = None + password_reset_required: Optional[bool] = False + password_reset_required_with_mfa: Optional[bool] = False + groups: List[str] = [] + + +class RmGcpGroupAddMeta(RmMetaBase): + group_types: List[str] = [] + + +class RmOktaUserAddMeta(RmMetaBase): + account_enabled: Optional[bool] = True + display_name: Optional[str] = None + password_reset_required: Optional[bool] = False + password_reset_required_with_mfa: Optional[bool] = False + groups: List[str] = [] + + +class RmOktaGroupAddMeta(RmMetaBase): + group_types: List[str] = [] + + class RmDomainUserAddMeta(RmMetaBase): roles: List[str] = [] groups: List[str] = [] @@ -253,6 +277,10 @@ class RmMongoDbRoleAddMeta(RmMetaBase): # MACHINE +class RmUserDeleteBaseMeta(RmMetaBase): + remove_home_dir: Optional[bool] = True + + class RmLinuxGroupAddMeta(RmMetaBase): gid: Optional[int] = None system_group: Optional[bool] = False @@ -291,8 +319,7 @@ class RmLinuxUserAddMeta(RmMachineUserAddMeta): non_system_dir_mode: Optional[str] = None -class RmLinuxUserDeleteMeta(RmMetaBase): - remove_home_dir: Optional[bool] = True +class RmLinuxUserDeleteMeta(RmUserDeleteBaseMeta): remove_user_group: Optional[bool] = True @@ -308,6 +335,10 @@ class RmWindowsUserAddMeta(RmMachineUserAddMeta): groups: List[str] = [] +class RmWindowsUserDeleteMeta(RmUserDeleteBaseMeta): + pass + + class RmMacOsUserAddMeta(RmMachineUserAddMeta): display_name: Optional[str] = None uid: Optional[str] = None @@ -325,6 +356,10 @@ class RmMacOsRoleAddMeta(RmMetaBase): record_name: Optional[str] = None +class RmMacOsUserDeleteMeta(RmUserDeleteBaseMeta): + pass + + # DIRECTORY diff --git a/keepercommander/discovery_common/types.py b/keepercommander/discovery_common/types.py index 710353f44..d4d17d953 100644 --- a/keepercommander/discovery_common/types.py +++ b/keepercommander/discovery_common/types.py @@ -5,6 +5,7 @@ import datetime import base64 import json +import hashlib from keeper_secrets_manager_core.crypto import CryptoUtils from typing import Any, Union, Optional, List, TYPE_CHECKING @@ -524,6 +525,18 @@ class DiscoveryObject(BaseModel): # Specific information for a record type. item: Union[DiscoveryConfiguration, DiscoveryUser, DiscoveryMachine, DiscoveryDatabase, DiscoveryDirectory] + @property + def md5(self) -> str: + data = self.model_dump() + + # Don't include these in the MD5 + data.pop("missing_since_ts", None) + data.pop("access_user", None) + + m = hashlib.md5() + m.update(json.dumps(data).encode('utf-8')) + return m.hexdigest() + @property def record_exists(self): return self.record_uid is not None @@ -603,29 +616,98 @@ class NormalizedRecord(BaseModel): title: str fields: List[RecordField] = [] note: Optional[str] = None + record_exists: bool = True + + def _field(self, + field_type: Optional[str] = None, + label: Optional[str] = None) -> Optional[RecordField]: + if field_type is None and label is None: + raise ValueError("either field_type or label needs to be set to find field in NormalizedRecord.") - def _field(self, field_type, label) -> Optional[RecordField]: for field in self.fields: - value = field.value - if value is None or len(value) == 0: - continue - if field.label == field_type and value[0].lower() == label.lower(): + if field_type is not None and field_type == field.type: + return field + if label is not None and label == field.label: return field return None - def find_user(self, user): + def find_field(self, + field_type: Optional[str] = None, + label: Optional[str] = None) -> Optional[RecordField]: + + return self._field(field_type=field_type, label=label) + + def get_value(self, + field_type: Optional[str] = None, + label: Optional[str] = None) -> Optional[Any]: + + field = self.find_field(field_type=field_type, label=label) + if field is None or field.value is None or len(field.value) == 0: + return None + return field.value[0] + + def get_user(self) -> Optional[str]: + field = self._field(field_type="login") + if field is None: + return None + value = field.value + if isinstance(value, list): + if len(value) == 0: + return None + value = value[0] + return value + + def get_dn(self) -> Optional[str]: + field = self._field(label="distinguishedName") + if field is None: + return None + value = field.value + if isinstance(value, list): + if len(value) == 0: + return None + value = value[0] + return value + + def has_user(self, user) -> bool: from .utils import split_user_and_domain - res = self._field("login", user) - if res is None: - user, _ = split_user_and_domain(user) - res = self._field("login", user) + user, _ = split_user_and_domain(user) + + field = self._field(field_type="login") + if field is None: + return False + + value = field.value + if isinstance(value, list): + if len(value) == 0: + return False + value = value[0] + elif isinstance(value, str): + value = value.lower() + + if user.lower() == value: + return True + + return False + + def has_dn(self, user) -> bool: + field = self._field(label="distinguishedName") + if field is None: + return False - return res + value = field.value + if isinstance(value, list): + if len(value) == 0: + return False + value = value[0] + elif isinstance(value, str): + value = value.lower() - def find_dn(self, user): - return self._field("distinguishedName", user) + if user.lower() == value: + return True + + return False class PromptResult(BaseModel): diff --git a/keepercommander/discovery_common/user_service.py b/keepercommander/discovery_common/user_service.py index 455526ad8..cf58675a2 100644 --- a/keepercommander/discovery_common/user_service.py +++ b/keepercommander/discovery_common/user_service.py @@ -1,21 +1,22 @@ from __future__ import annotations import logging -from .constants import USER_SERVICE_GRAPH_ID, PAM_MACHINE, PAM_USER, PAM_DIRECTORY, DOMAIN_USER_CONFIGS -from .utils import get_connection, user_in_lookup, user_check_list, make_agent -from .types import DiscoveryObject, ServiceAcl, FactsNameUser +import os + +from .constants import PAM_MACHINE, PAM_USER, PAM_DIRECTORY, DOMAIN_USER_CONFIGS +from .utils import get_connection, make_agent, split_user_and_domain, value_to_boolean +from .types import DiscoveryObject, ServiceAcl, NormalizedRecord from .infrastructure import Infrastructure +from .record_link import RecordLink from ..keeper_dag import DAG, EdgeType -from ..keeper_dag.types import PamEndpoints, PamGraphId +from ..keeper_dag.types import PamGraphId import importlib -from typing import Any, Optional, List, TYPE_CHECKING +from typing import Any, Optional, List, Callable, Dict, TYPE_CHECKING if TYPE_CHECKING: from ..keeper_dag.vertex import DAGVertex from ..keeper_dag.edge import DAGEdge -# TODO: Refactor this code; we can make this smaller since method basically do the same functions, just different -# attributes. class UserService: def __init__(self, record: Any, logger: Optional[Any] = None, history_level: int = 0, @@ -23,6 +24,10 @@ def __init__(self, record: Any, logger: Optional[Any] = None, history_level: int save_batch_count: int = 200, agent: Optional[str] = None, **kwargs): + # Keep these for other graphs + self._params = kwargs.get("params") + self._ksm = kwargs.get("ksm") + self.conn = get_connection(**kwargs) # This will either be a KSM Record, or Commander KeeperRecord @@ -44,19 +49,38 @@ def __init__(self, record: Any, logger: Optional[Any] = None, history_level: int self.auto_save = False self.last_sync_point = -1 + self.directory_user_cache: Optional[Dict[str, Dict]] = None + + # Mapping that use to keep track of what relationship have been update. + self.cleanup_mapping = {} + + self.insecure_debug = value_to_boolean(os.environ.get("INSECURE_DEBUG", False)) + self.log_finer_level = 0 + try: + self.log_finer_level = int(os.environ.get("KEEPER_GATEWAY_SERVICE_LOG_FINER_LEVEL", 0)) + except (Exception,): + pass + + def debug(self, msg, level: int = 0, secret: bool = False): + if self.log_finer_level >= level: + if secret: + if self.insecure_debug: + self.logger.debug(msg) + else: + self.logger.debug(msg) + @property def dag(self) -> DAG: if self._dag is None: self._dag = DAG(conn=self.conn, record=self.record, - # endpoint=PamEndpoints.SERVICE_LINKS, graph_id=PamGraphId.SERVICE_LINKS, auto_save=False, logger=self.logger, history_level=self.history_level, debug_level=self.debug_level, - name="Discovery Service/Tasks", + name="Discovery Services", fail_on_corrupt=self.fail_on_corrupt, log_prefix=self.log_prefix, save_batch_count=self.save_batch_count, @@ -64,6 +88,9 @@ def dag(self) -> DAG: self._dag.load(sync_point=0) + # If an empty graph, call root get create a vertex. + _ = self._dag.get_root + return self._dag def close(self): @@ -71,9 +98,11 @@ def close(self): Clean up resources held by this UserService instance. Releases the DAG instance and connection to prevent memory leaks. """ - if self._dag is not None: - self._dag = None + + self._dag = None self.conn = None + self._params = None + self._ksm = None def __enter__(self): """Context manager entry.""" @@ -112,8 +141,11 @@ def get_record_uid(discovery_vertex: DAGVertex) -> str: return content.record_uid raise Exception(f"The discovery vertex {discovery_vertex.uid} data does not have a populated record UID.") - def belongs_to(self, resource_uid: str, user_uid: str, acl: Optional[ServiceAcl] = None, - resource_name: Optional[str] = None, user_name: Optional[str] = None): + def belongs_to(self, + resource_uid: str, + user_uid: str, acl: Optional[ServiceAcl] = None, + resource_name: Optional[str] = None, + user_name: Optional[str] = None): """ Link vault records using record UIDs. @@ -121,24 +153,32 @@ def belongs_to(self, resource_uid: str, user_uid: str, acl: Optional[ServiceAcl] If a link already exists, no additional link will be created. """ + if resource_uid is None: + self.debug("resource_uid is blank, do not connect") + return + if user_uid is None: + self.debug("user_uid is blank, do not connect") + return + # Get thr record vertices. # If a vertex does not exist, then add the vertex using the record UID resource_vertex = self.dag.get_vertex(resource_uid) if resource_vertex is None: - self.logger.debug(f"adding resource vertex for record UID {resource_uid} ({resource_name})") + self.debug(f"adding resource vertex for record UID {resource_uid} ({resource_name})") resource_vertex = self.dag.add_vertex(uid=resource_uid, name=resource_name) user_vertex = self.dag.get_vertex(user_uid) if user_vertex is None: - self.logger.debug(f"adding user vertex for record UID {user_uid} ({user_name})") + self.debug(f"adding user vertex for record UID {user_uid} ({user_name})") user_vertex = self.dag.add_vertex(uid=user_uid, name=user_name) - self.logger.debug(f"user {user_vertex.uid} controls services on {resource_vertex.uid}") + self.debug(f"user {user_vertex.uid} controls services on {resource_vertex.uid}") edge_type = EdgeType.LINK if acl is not None: edge_type = EdgeType.ACL + self.debug(f"Connect {user_vertex.uid} to {resource_vertex.uid}") user_vertex.belongs_to(resource_vertex, edge_type=edge_type, content=acl) def disconnect_from(self, resource_uid: str, user_uid: str): @@ -156,11 +196,16 @@ def get_acl(self, resource_uid, user_uid) -> Optional[ServiceAcl]: resource_vertex = self.dag.get_vertex(resource_uid) user_vertex = self.dag.get_vertex(user_uid) if resource_vertex is None or user_vertex is None: - self.logger.debug(f"there is no acl between {resource_uid} and {user_uid}") + if resource_vertex is None: + self.debug("The resource vertex does not exists get; return default ACL") + if user_vertex is None: + self.debug("The user vertex does not exists get; return default ACL") return ServiceAcl() acl_edge = user_vertex.get_edge(resource_vertex, edge_type=EdgeType.ACL) # type: DAGEdge if acl_edge is None: + self.debug(f"ACL does not exists between resource {resource_uid} and user {user_vertex} doesn't " + "exist; return None") return None return acl_edge.content_as_object(ServiceAcl) @@ -206,10 +251,10 @@ def delete(vertex: DAGVertex): def save(self): if self.dag.has_graph: - self.logger.debug("saving the service user.") + self.debug("saving the service user.") self.dag.save(delta_graph=False) else: - self.logger.debug("the service user graph does not contain any data, was not saved.") + self.debug("the service user graph does not contain any data, was not saved.") def to_dot(self, graph_format: str = "svg", show_version: bool = True, show_only_active_vertices: bool = True, show_only_active_edges: bool = True, graph_type: str = "dot"): @@ -230,7 +275,7 @@ def to_dot(self, graph_format: str = "svg", show_version: bool = True, show_only else: dot.attr(layout=graph_type) - self.logger.debug(f"have {len(self.dag.all_vertices)} vertices") + self.debug(f"have {len(self.dag.all_vertices)} vertices") for v in self.dag.all_vertices: if show_only_active_vertices is True and v.active is False: continue @@ -301,378 +346,579 @@ def to_dot(self, graph_format: str = "svg", show_version: bool = True, show_only return dot - def _get_directory_user_vertices(self, configuration_vertex: DAGVertex, domain_name: str) -> List[DAGVertex]: + def _init_cleanup_user_mapping(self): + """ - Find the directory in the graph and return of list of user vertices. + Create of mapping of existing user services to see what was updated. + + This is the basically graph in dictionary format with the update flag set to False. """ - domain_name = domain_name.lower() - - user_vertices: List[DAGVertex] = [] - - # Check the configuration; it might provide domains. - # Need to only include the user vertices. - # If we find it here, we don't need to check for directories; so return with the list. - config_content = DiscoveryObject.get_discovery_object(configuration_vertex) - if config_content.record_type in DOMAIN_USER_CONFIGS: - config_domains = config_content.item.info.get("domains", []) - self.logger.debug(f" the provider provides domains: {config_domains}") - for config_domain in config_domains: - if config_domain.lower() == domain_name: - self.logger.debug(f" matched for {domain_name}") - for vertex in configuration_vertex.has_vertices(): - content = DiscoveryObject.get_discovery_object(vertex) - if content.record_type == PAM_USER: - user_vertices.append(vertex) - self.logger.debug(f" found {len(user_vertices)} users for {domain_name}") - return user_vertices - - self.logger.debug(" checking pam directories for users") - - # If the configuration did not have domain users, or there were do users, check the PAM Directories. - for resource_vertex in configuration_vertex.has_vertices(): - content = DiscoveryObject.get_discovery_object(resource_vertex) - if content.record_type != PAM_DIRECTORY: - continue - if content.name.lower() == domain_name: - user_vertices = resource_vertex.has_vertices() - self.logger.debug(f" found {len(user_vertices)} users for {domain_name}") - break + self.cleanup_mapping = {} + for user_service_machine in self.dag.get_root.has_vertices(): + if user_service_machine.uid not in self.cleanup_mapping: + self.cleanup_mapping[user_service_machine.uid] = {} + for user_service_user in user_service_machine.has_vertices(): + self.cleanup_mapping[user_service_machine.uid][user_service_user.uid] = False - return user_vertices + def _user_is_used(self, machine_record_uid: str, user_record_uid: str): - def _get_user_vertices(self, - infra_resource_content: DiscoveryObject, - infra_resource_vertex: DAGVertex) -> List[DAGVertex]: + """ + Flag the user exists for a machine. + """ - self.logger.debug(f" getting users for {infra_resource_content.name}") + if machine_record_uid in self.cleanup_mapping and user_record_uid in self.cleanup_mapping[machine_record_uid]: + self.cleanup_mapping[machine_record_uid][user_record_uid] = True - # If this machine joined to a directory. - # Since this a Windows machine, we can have only one joined directory; take the first one. - domain_name = None - if len(infra_resource_content.item.facts.directories) > 0: - domain_name = infra_resource_content.item.facts.directories[0].domain - self.logger.debug(f" joined to {domain_name}") - - # Get a list of local users. - # If the machine is joined to a domain, get a list of users from that domain. - user_vertices = infra_resource_vertex.has_vertices() - self.logger.debug(f" found {len(user_vertices)} local users") - if domain_name is not None: - user_vertices += self._get_directory_user_vertices( - configuration_vertex=infra_resource_vertex.belongs_to_vertices()[0], - domain_name=domain_name - ) - - self.logger.debug(f" found {len(user_vertices)} total users") - - return user_vertices - - def _connect_service_users(self, - infra_resource_content: DiscoveryObject, - infra_resource_vertex: DAGVertex, - services: List[FactsNameUser]): - - self.logger.debug(f"processing services for {infra_resource_content.description} ({infra_resource_vertex.uid})") - - # We don't care about the name of the service, we just need a list users. - lookup = {} - for service in services: - lookup[service.user.lower()] = True - - infra_user_vertices = self._get_user_vertices(infra_resource_content=infra_resource_content, - infra_resource_vertex=infra_resource_vertex) - - for infra_user_vertex in infra_user_vertices: - infra_user_content = DiscoveryObject.get_discovery_object(infra_user_vertex) - if infra_user_content.record_uid is None: + def _cleanup_users(self): + + """ + Disconnect all users from machines that are not used. + """ + + self.debug("cleaning up unused user service relationships") + + did_something = False + for machine_record_uid in self.cleanup_mapping: + for user_record_uid in self.cleanup_mapping[machine_record_uid]: + if not self.cleanup_mapping[machine_record_uid][user_record_uid]: + self.debug(f" * disconnect user {user_record_uid} from machine {machine_record_uid}") + did_something = True + self.disconnect_from(machine_record_uid, user_record_uid) + if not did_something: + self.debug(f" nothing to cleanup") + + @staticmethod + def _get_local_users_from_record(record_lookup_func: Callable, + rl_machine_vertex: DAGVertex) -> Dict[str, str]: + + # Get the local users + user_records: Dict[str, str] = {} + + for rl_user_vertex in rl_machine_vertex.has_vertices(): + record = record_lookup_func(rl_user_vertex.uid, allow_sm=False) # type: NormalizedRecord + if record and record.record_type == PAM_USER: + user = record.get_user() + if user is not None: + user, domain = split_user_and_domain(user.lower()) + if domain is not None: + user += "@" + domain + user_records[user] = record.record_uid + + return user_records + + @staticmethod + def _get_local_users_from_infra(record_lookup_func: Callable, + infra_machine_vertex: DAGVertex) -> Dict[str, str]: + + user_records: Dict[str, str] = {} + for infra_user_vertex in infra_machine_vertex.has_vertices(): + user_content = DiscoveryObject.get_discovery_object(infra_user_vertex) + if user_content.record_type != PAM_USER or user_content.record_uid is None: continue - if user_in_lookup( - lookup=lookup, - user=infra_user_content.item.user, - name=infra_user_content.name, - source=infra_user_content.item.source): - self.logger.debug(f" * found user for service: {infra_user_content.item.user}") - acl = self.get_acl(infra_resource_content.record_uid, infra_user_content.record_uid) - if acl is None: - acl = ServiceAcl() - acl.is_service = True - self.belongs_to( - resource_uid=infra_resource_content.record_uid, - resource_name=infra_resource_content.uid, - user_uid=infra_user_content.record_uid, - user_name=infra_user_content.uid, - acl=acl) - - def _connect_task_users(self, - infra_resource_content: DiscoveryObject, - infra_resource_vertex: DAGVertex, - tasks: List[FactsNameUser]): - - self.logger.debug(f"processing tasks for {infra_resource_content.description} ({infra_resource_vertex.uid})") - - # We don't care about the name of the tasks, we just need a list users. - lookup = {} - for task in tasks: - lookup[task.user.lower()] = True - - infra_user_vertices = self._get_user_vertices(infra_resource_content=infra_resource_content, - infra_resource_vertex=infra_resource_vertex) - - for infra_user_vertex in infra_user_vertices: - infra_user_content = DiscoveryObject.get_discovery_object(infra_user_vertex) - if infra_user_content.record_uid is None: + if record_lookup_func(user_content.record_uid, allow_sm=False): + user, domain = split_user_and_domain(user_content.item.user.lower()) + if domain is not None: + user += "@" + domain + user_records[user] = user_content.record_uid + + return user_records + + def _get_directory_users_from_conf_record(self, + record_linking: RecordLink, + domain_name: str, + record_lookup_func: Callable) -> Dict[str, str]: + + user_records: Dict[str, str] = {} + + # check if a PAM configuration that support having users (Azure, Domain Controller) + # We need to get the normalized record of the configuration record. + configuration_record = record_lookup_func( + self.conn.get_record_uid(self.record), allow_sm=False) # type: NormalizedRecord + if configuration_record.record_type in DOMAIN_USER_CONFIGS: + # The Domain Controller record will have the domain; Azure record will not. + config_domain_name = configuration_record.get_value(label="pamdomainid") + + # If the domain name is not set, or it is, and we match the one that machine is joined to. + if config_domain_name is None or config_domain_name.lower() == domain_name: + config_vertex = record_linking.dag.get_vertex(configuration_record.record_uid) + for child_vertex in config_vertex.has_vertices(): + user_record = record_lookup_func(child_vertex.uid, allow_sm=False) # type: NormalizedRecord + if not user_record: + # self.debug(f" * record uid {child_vertex.uid} not found") + continue + if user_record.record_type != PAM_USER: + # self.debug(f" * record uid {child_vertex.uid} is not PAM User") + continue + user, domain = split_user_and_domain(user_record.get_user().lower()) + if domain is None: + domain = domain_name + user += "@" + domain + user_records[user] = user_record.record_uid + else: + self.debug(f" domain name {config_domain_name} does not match {domain_name}") + else: + self.debug(" configuration type does not allow AD users") + + return user_records + + def _get_directory_users_from_conf_infra(self, + infra: Infrastructure, + domain_name: str, + record_lookup_func: Callable) -> Dict[str, str]: + + user_records: Dict[str, str] = {} + + config_vertex = infra.get_configuration + config_context = DiscoveryObject.get_discovery_object(config_vertex) + if config_context.record_type in DOMAIN_USER_CONFIGS: + for config_domain_name in config_context.item.info.get("domains", []): + if config_domain_name != domain_name: + self.debug(f" domain name {config_domain_name} does not match {domain_name}") + continue + for child_vertex in config_vertex.has_vertices(): + child_context = DiscoveryObject.get_discovery_object(child_vertex) + if child_context.record_type == PAM_USER and record_lookup_func(child_context.record_uid, + allow_sm=False): + user, domain = split_user_and_domain(child_context.item.user.lower()) + if domain is None: + domain = domain_name + user += "@" + domain + user_records[user] = child_context.record_uid + + return user_records + + def _get_directory_users_from_records(self, + record_linking: RecordLink, + domain_name: str, + record_lookup_func: Callable) -> Dict[str, str]: + + user_records: Dict[str, str] = {} + + # From the record linking graph, check each record connected to the configuration to see if it is a + # PAM directory record. + for rl_resource_vertex in record_linking.dag.get_root.has_vertices(): + directory_record = record_lookup_func(rl_resource_vertex.uid, allow_sm=False) # type: NormalizedRecord + if directory_record and directory_record.record_type == PAM_DIRECTORY: + record_domain_name = directory_record.get_value(label="domainName") + if record_domain_name is None: + self.logger.warning(f" record uid {rl_resource_vertex.uid} is a directory, but the " + "Domain Name is not set.") + continue + if record_domain_name.lower() == domain_name: + self.debug(f" record uid {rl_resource_vertex.uid} matches the domain name") + for rl_user_vertex in rl_resource_vertex.has_vertices(): + user_record = record_lookup_func(rl_user_vertex.uid, allow_sm=False) # type: NormalizedRecord + if user_record is None or user_record.record_type != PAM_USER: + continue + + # Get the directory users, format the username to be user@domain + user = user_record.get_user() + if user is not None: + user, domain = split_user_and_domain(user.lower()) + if domain is None: + domain = domain_name + user += "@" + domain + user_records[user] = user_record.record_uid + else: + self.debug(f" ! record uid {rl_user_vertex.uid} has a blank user") + + return user_records + + @staticmethod + def _get_directory_users_from_infra(infra_machine_vertex: DAGVertex, + domain_name: str, + record_lookup_func: Callable) -> Dict[str, str]: + + user_records: Dict[str, str] = {} + + configuration_vertex = infra_machine_vertex.belongs_to_vertices()[0] + for resource_vertex in configuration_vertex.has_vertices(): + if not resource_vertex.has_data: continue - if user_in_lookup( - lookup=lookup, - user=infra_user_content.item.user, - name=infra_user_content.name, - source=infra_user_content.item.source): - self.logger.debug(f" * found user for task: {infra_user_content.item.user}") - acl = self.get_acl(infra_resource_content.record_uid, infra_user_content.record_uid) - if acl is None: - acl = ServiceAcl() - acl.is_task = True - self.belongs_to( - resource_uid=infra_resource_content.record_uid, - resource_name=infra_resource_content.uid, - user_uid=infra_user_content.record_uid, - user_name=infra_user_content.uid, - acl=acl) - - def _connect_iis_pool_users(self, - infra_resource_content: DiscoveryObject, - infra_resource_vertex: DAGVertex, - iis_pools: List[FactsNameUser]): - - self.logger.debug(f"processing iis pools for " - f"{infra_resource_content.description} ({infra_resource_vertex.uid})") - - # We don't care about the name of the tasks, we just need a list users. - lookup = {} - for iis_pool in iis_pools: - lookup[iis_pool.user.lower()] = True - - infra_user_vertices = self._get_user_vertices(infra_resource_content=infra_resource_content, - infra_resource_vertex=infra_resource_vertex) - - for infra_user_vertex in infra_user_vertices: - infra_user_content = DiscoveryObject.get_discovery_object(infra_user_vertex) - if infra_user_content.record_uid is None: + resource_content = DiscoveryObject.get_discovery_object(resource_vertex) + if resource_content.record_type != PAM_DIRECTORY or resource_content.name.lower() != domain_name: continue - if user_in_lookup( - lookup=lookup, - user=infra_user_content.item.user, - name=infra_user_content.name, - source=infra_user_content.item.source): - self.logger.debug(f" * found user for iis pool: {infra_user_content.item.user}") - acl = self.get_acl(infra_resource_content.record_uid, infra_user_content.record_uid) - if acl is None: - acl = ServiceAcl() - acl.is_iis_pool = True - self.belongs_to( - resource_uid=infra_resource_content.record_uid, - resource_name=infra_resource_content.uid, - user_uid=infra_user_content.record_uid, - user_name=infra_user_content.uid, - acl=acl) - - def _validate_users(self, - infra_resource_content: DiscoveryObject, - infra_resource_vertex: DAGVertex): + for user_vertex in resource_vertex.has_vertices(): + if not user_vertex.has_data: + continue + user_content = DiscoveryObject.get_discovery_object(user_vertex) + if user_content.record_type != PAM_USER and user_content.record_uid is None: + continue + if record_lookup_func(user_content.record_uid, allow_sm=False): + + # Format the username to be user@domain + user, domain = split_user_and_domain(user_content.item.user.lower()) + if domain is None: + domain = domain_name + user += "@" + domain + user_records[user] = user_content.record_uid + return user_records + + def _get_users(self, + infra: Infrastructure, + infra_machine_content: DiscoveryObject, + infra_machine_vertex: DAGVertex, + record_linking: RecordLink, + record_lookup_func: Callable) -> Dict[str, str]: """ - This method will check to see if a resource's users' ACL edges are still valid. + Get local and directory users for machine. + + The return values will be a dictionary of record_uid to username. - This check will check both local and directory users. + It will first check the records linking graph. Then check the infrastructure graph. """ - self.logger.debug(f"validate existing user service edges to see if still valid to " - f"{infra_resource_content.name}") + self.debug(f" getting users for {infra_machine_content.name}, {infra_machine_content.record_uid}") - service_lookup = {} - for service in infra_resource_content.item.facts.services: - service_lookup[service.user.lower()] = True + # Get the domain name that the machine it joined to. + # Only accept the first one; we are Windows, only allow one domain. + domain_name = None + for directory in infra_machine_content.item.facts.directories: + if directory.domain is not None: + domain_name = directory.domain.lower() + self.debug(f" machine is joined to {domain_name}") + break + + # Keep separate dictionaries since we are going to cache the directory users by domain name. + # { "user": "record uid", ... } + local_user_records: Dict[str, str] = {} + directory_user_records: Dict[str, str] = {} + + using_directory_user_cache = False + if domain_name: + # Once we get directory users for a domain name, they will not change. + # Cache them so we don't have to get them again. + if self.directory_user_cache is not None: + directory_user_records = self.directory_user_cache.get(domain_name) + self.debug(f" using directory user cache for {domain_name}, " + f"{len(directory_user_records)} users") + using_directory_user_cache = True + + ########################### + + # Find the users using the record linking graph. + self.debug(f" getting users from record linking", level=1) + record_link_vertex = record_linking.dag.get_vertex(infra_machine_content.record_uid) + if record_link_vertex is None: + self.debug(" record uid {machine_record_uid} does not exist in the Vault.", level=1) + else: - task_lookup = {} - for task in infra_resource_content.item.facts.tasks: - task_lookup[task.user.lower()] = True + # Get the local users from records + self.debug(" getting local users from records", level=1) + user_records = self._get_local_users_from_record(rl_machine_vertex=record_link_vertex, + record_lookup_func=record_lookup_func) + self.debug(f" * found {len(user_records)} local users from records", level=1) + local_user_records = {**local_user_records, **user_records} - iis_pool_lookup = {} - for iss_pool in infra_resource_content.item.facts.iis_pools: - iis_pool_lookup[iss_pool.user.lower()] = True + if not using_directory_user_cache and domain_name is not None: - # Get the user service resource vertex. - # If it does not exist, then we cannot validate users. - user_service_resource_vertex = self.dag.get_vertex(infra_resource_content.record_uid) - if user_service_resource_vertex is None: - return + self.debug(" getting directory users from the configuration record", level=1) + user_records = self._get_directory_users_from_conf_record(record_linking=record_linking, + domain_name=domain_name, + record_lookup_func=record_lookup_func) - infra_dag = infra_resource_vertex.dag + self.debug(f" * found {len(user_records)} directory users records from " + "the configuration record", level=1) + directory_user_records = {**directory_user_records, **user_records} - # The users from the service graph will contain local and directory users. - for user_service_user_vertex in user_service_resource_vertex.has_vertices(): - acl_edge = user_service_user_vertex.get_edge( - user_service_resource_vertex, edge_type=EdgeType.ACL) # type: DAGEdge - if acl_edge is None: - self.logger.info(f"User record {user_service_user_vertex.uid} does not have an ACL edge to " - f"{user_service_resource_vertex.uid} for user services.") - continue + self.debug(" getting directory users from directory records", level=1) + user_records = self._get_directory_users_from_records(record_linking=record_linking, + domain_name=domain_name, + record_lookup_func=record_lookup_func) + self.debug(f" * found {len(user_records)} directory users from records for {domain_name}", + level=1) + + directory_user_records = {**directory_user_records, **user_records} + + #################### + + # Find the users via infrastructure graph + + self.debug(f" getting users from infrastructure", level=1) + self.debug(" getting local users from infrastructure", level=1) + user_records = self._get_local_users_from_infra(infra_machine_vertex=infra_machine_vertex, + record_lookup_func=record_lookup_func) + self.debug(f" * found {len(user_records)} local users from graph", level=1) + local_user_records = {**user_records, **local_user_records} + + if not using_directory_user_cache and domain_name is not None: + + self.debug(" getting directory users from configuration infrastructure", level=1) + user_records = self._get_directory_users_from_conf_infra(infra=infra, + domain_name=domain_name, + record_lookup_func=record_lookup_func) + self.debug(f" * found {len(user_records)} directory users from configuration for {domain_name}", + level=1) + directory_user_records = {**user_records, **directory_user_records} + + # ------------- + + self.debug(" getting directory users from directory infrastructure", level=1) + user_records = self._get_directory_users_from_infra(infra_machine_vertex=infra_machine_vertex, + domain_name=domain_name, + record_lookup_func=record_lookup_func) + self.debug(f" * found {len(user_records)} directory users from graph for {domain_name}", level=1) + directory_user_records = {**user_records, **directory_user_records} + + # If we were not using the directory cache, cache them. + if domain_name is not None and not using_directory_user_cache: + if self.directory_user_cache is None: + self.directory_user_cache = {} + self.directory_user_cache[domain_name] = directory_user_records + + all_record = {**directory_user_records, **local_user_records} + + self.debug(f" total union of users count {len(all_record.keys())}") + + return all_record + + def _connect_users_to_services(self, + infra: Infrastructure, + infra_machine_content: DiscoveryObject, + infra_machine_vertex: DAGVertex, + record_linking: RecordLink, + record_lookup_func: Callable, + strict: bool = False): + + domain_name = None + for directory in infra_machine_content.item.facts.directories: + if directory.domain is not None: + domain_name = directory.domain.lower() + break + + # Add mapping from user to machine, that control services. + for service_type in ["service", "task", "iis_pool"]: + self.debug("-" * 40) + self.debug(f"processing {service_type}s for {infra_machine_content.name} " + f"({infra_machine_vertex.uid})") + + # We don't care about the name of the service, we just need a list users. + service_users = [] + for service_user in getattr(infra_machine_content.item.facts, f"{service_type}s"): + self.debug(f" * {service_type}: {service_user.name} ({service_user.user})", secret=True) + user = service_user.user.lower() + if not strict: + user, domain = split_user_and_domain(user) + service_users.append(user) + if domain is not None and domain != ".": + service_users.append(user + "@" + domain) + service_users.append(user + "@" + domain.split(".")[0]) + if domain_name is not None: + service_users.append(user + "@" + domain_name) + service_users.append(user + "@" + domain_name.split(".")[0]) - found_service_acl = False - found_task_acl = False - found_iis_pool_acl = False - changed = False - - acl = acl_edge.content_as_object(ServiceAcl) - - # This will check the entire infrastructure graph for the user with the record UID. - # This could be a local or directory users. - user = infra_dag.search_content({"record_type": PAM_USER, "record_uid": user_service_user_vertex.uid}) - infra_user_content = None - found_user = len(user) > 0 - if found_user: - infra_user_vertex = user[0] - if infra_user_vertex.active is False: - found_user = False else: - infra_user_content = DiscoveryObject.get_discovery_object(infra_user_vertex) + service_users.append(user) + + service_users = list(set(service_users)) - if not found_user: - self.disconnect_from(user_service_resource_vertex.uid, user_service_user_vertex.uid) + if len(service_users) == 0: + self.debug(f" no users control {service_type}s, skipping.") continue - check_list = user_check_list( - user=infra_user_content.item.user, - name=infra_user_content.name, - source=infra_user_content.item.source - ) - - if acl.is_service: - for check_user in check_list: - if check_user in service_lookup: - found_service_acl = True - break - if not found_service_acl: - acl.is_service = False - changed = True - - if acl.is_task: - for check_user in check_list: - if check_user in task_lookup: - found_task_acl = True - break - if not found_task_acl: - acl.is_task = False - changed = True - - if acl.is_iis_pool: - for check_user in check_list: - if check_user in iis_pool_lookup: - found_iis_pool_acl = True - break - if not found_iis_pool_acl: - acl.is_iis_pool = False - changed = True - - if (found_service_acl is True or found_task_acl is True or found_iis_pool_acl is True) or changed is True: - self.logger.debug(f"user {user_service_user_vertex.uid}(US) to " - f"{user_service_resource_vertex.uid} updated") - self.belongs_to(user_service_resource_vertex.uid, user_service_user_vertex.uid, acl) - elif found_service_acl is False and found_task_acl is False and found_iis_pool_acl is False: - self.logger.debug(f"user {user_service_user_vertex.uid}(US) to " - f"{user_service_resource_vertex.uid} disconnected") - self.disconnect_from(user_service_resource_vertex.uid, user_service_user_vertex.uid) - - self.logger.debug(f"DONE validate existing user") - - def run(self, infra: Optional[Infrastructure] = None, **kwargs): + users = self._get_users(infra=infra, + infra_machine_content=infra_machine_content, + infra_machine_vertex=infra_machine_vertex, + record_linking=record_linking, + record_lookup_func=record_lookup_func) + + if self.log_finer_level >= 2 and self.insecure_debug: + for k, v in users.items(): + self.debug(f"> {k} = {v}") + + self.debug(f"users to check: {service_users}", secret=True) + for service_user in service_users: + self.debug(f" * {service_user}", secret=True) + if service_user in users: + record_uid = users[service_user] + self.debug(f" found user {service_user} for {service_type}", secret=True) + acl = self.get_acl(infra_machine_content.record_uid, record_uid) + if acl is None: + acl = ServiceAcl() + acl_attr = "is_" + service_type + + # Flag the user was found; don't disconnect + self._user_is_used(machine_record_uid=infra_machine_content.record_uid, + user_record_uid=record_uid) + + # Only update if the attribute is currently False; reduce edges. + if getattr(acl, acl_attr) is False: + setattr(acl, acl_attr, True) + self.belongs_to(resource_uid=infra_machine_content.record_uid, + user_uid=record_uid, + acl=acl) + + def _get_resource_info(self, + record_uid: str, + infra: Infrastructure, + record_lookup_func: Callable, + record_types: Optional[List[str]] = None) -> Optional[NormalizedRecord]: + """ - Map users to services/tasks on machines. + Find a resource, or user, in the Vault or in the Infrastructure graph. + + This will return a NormalizedRecord record. + This doesn't mean the - IMPORTANT: To avoid memory leaks, pass an existing Infrastructure instance - instead of letting this method create a new one. Example: - user_service.run(infra=process.infra) """ - self.logger.debug("") - self.logger.debug("##########################################################################################") - self.logger.debug("# MAP USER TO MACHINE FOR SERVICE/TASKS") - self.logger.debug("") - - # If an instance of Infrastructure is not passed in. - # NOTE: Creating a new Infrastructure instance here can cause memory leaks. - # Prefer passing an existing instance via the infra parameter. - _cleanup_infra_on_exit = False - if infra is None: - self.logger.warning("Creating new Infrastructure instance - consider passing existing instance to avoid memory leaks") - - # Get ksm from the connection. - # However, this might be a local connection, so check first. - # Local connections don't need ksm. - if hasattr(self.conn, "ksm"): - kwargs["ksm"] = getattr(self.conn, "ksm") - - # Get the entire infrastructure graph; sync point = 0 - infra = Infrastructure(record=self.record, **kwargs) - infra.load() - _cleanup_infra_on_exit = True - - # Work ourselves to the configuration vertex. - infra_root_vertex = infra.get_root - infra_config_vertex = infra_root_vertex.has_vertices()[0] - - # For the user service, the root vertex is the equivalent to the infrastructure configuration vertex. - user_service_config_vertex = self.dag.get_root - - # Find all the resources that are machines. - for infra_resource_vertex in infra_config_vertex.has_vertices(): - if infra_resource_vertex.active is False or infra_resource_vertex.has_data is False: - continue - infra_resource_content = DiscoveryObject.get_discovery_object(infra_resource_vertex) - if infra_resource_content.record_type == PAM_MACHINE: + # Check the record first; return a NormalizedRecord + record = record_lookup_func(record_uid, allow_sm=False) # type: NormalizedRecord + if record is not None: + self.debug(f" resource is {record.title}") + if record_types is not None and record.record_type not in record_types: + self.debug(f" not correct record type: {record.record_type}") + return None + return record + else: + self.debug(" not in Vault") + + infra_vertices = infra.dag.search_content({"record_uid": record_uid}) + if not len(infra_vertices): + self.debug(" not in infrastructure graph") + return None - self.logger.debug(f"checking {infra_resource_content.name}") + for vertex in infra_vertices: + if vertex.active: + content = DiscoveryObject.get_discovery_object(vertex) + record = NormalizedRecord( + record_uid=record_uid, + record_type=content.record_type, + title=content.title, + record_exists=False + ) + for field in content.fields: + record.fields.append(field) + + return record + + return None + + def run_user(self): + pass + + def run_full(self, + record_lookup_func: Callable, + infra: Optional[Infrastructure] = None, + record_linking: Optional[RecordLink] = None, + **kwargs): + """ + Map users to services on machines. - # Check the user on the resource if they still are part of a service or task. - self._validate_users(infra_resource_content, infra_resource_vertex) + This is driven by the record linking graph. + + :param infra: Instance of Infrastructure graph. + :param record_linking: Instance of the Record Linking graph. + :param record_lookup_func: A function that will return a record by record id. Returns a normalize record. + """ + + self.debug("") + self.debug("##########################################################################################") + self.debug("# MAP USER TO MACHINE FOR SERVICES") + self.debug("") + + # Load fresh + + created_infra = False + created_record_linking = False + + try: + + # Make of map of the current user to machine relationship. + self._init_cleanup_user_mapping() + + if not infra: + infra = Infrastructure(record=self.record, logger=self.logger, ksm=self._ksm, params=self._params) + infra.load(sync_point=0) + created_infra = True + + if not record_linking: + record_linking = RecordLink(record=self.record, logger=self.logger, ksm=self._ksm, params=self._params) + created_record_linking = True + + # The PAM Configuration record is the root vertex of the PAM/record linking graph. + rl_configuration_vertex = record_linking.dag.get_root + + # At this level the vertex will either be a resource or a cloud user. + for rl_resource_vertex in rl_configuration_vertex.has_vertices(): + + self.debug(f"checking record {rl_resource_vertex.uid}") + + # This will get machine from the records or from infrastructure graph. + # The results is a NormalizedRecord. + machine_record = self._get_resource_info(record_uid=rl_resource_vertex.uid, + infra=infra, + record_lookup_func=record_lookup_func, + record_types=[PAM_MACHINE]) + + if machine_record is None: + self.debug(" could not find record") + continue + + if machine_record.record_type != PAM_MACHINE: + self.debug(" record is not PAM Machine") + continue + + self.debug(f" checking machine {machine_record.title}") + + # Since the facts hold information about services, get those from the infrastructure graph. + infra_machine_vertex = infra.find_content({"record_uid": machine_record.record_uid}) + if not infra_machine_vertex: + self.debug(" could not find machine in the infrastructure graph, skipping") + continue + if not infra_machine_vertex.has_data: + self.debug(" machine has no data yet, skipping") + continue + + infra_machine_content = DiscoveryObject.get_discovery_object(infra_machine_vertex) + + # The `services` are currently on Windows machine, skip any machine that is not running Windows. + if infra_machine_content.item.os != "windows": + self.debug(" machine is not Windows, skipping") + continue # Do we have services, tasks, iis_pools that are run as a user with a password? - if infra_resource_content.item.facts.has_service_items is True: - - # If the resource does not exist in the user service graph, add a vertex and link it to the - # user service root/configuration vertex. - user_service_resource_vertex = self.dag.get_vertex(infra_resource_content.record_uid) - if user_service_resource_vertex is None: - user_service_resource_vertex = self.dag.add_vertex(uid=infra_resource_content.record_uid, - name=infra_resource_content.description) - if not user_service_config_vertex.has(user_service_resource_vertex): - user_service_resource_vertex.belongs_to_root(EdgeType.LINK) - - # Do we have services that are run as a user with a password? - if infra_resource_content.item.facts.has_services is True: - self._connect_service_users( - infra_resource_content, - infra_resource_vertex, - infra_resource_content.item.facts.services) - - # Do we have tasks that are run as a user with a password? - if infra_resource_content.item.facts.has_tasks is True: - self._connect_task_users( - infra_resource_content, - infra_resource_vertex, - infra_resource_content.item.facts.tasks) - - # Do we have tasks that are run as a user with a password? - if infra_resource_content.item.facts.has_iis_pools is True: - self._connect_iis_pool_users( - infra_resource_content, - infra_resource_vertex, - infra_resource_content.item.facts.iis_pools) - - self.save() - - # Clean up the Infrastructure instance if we created it - if _cleanup_infra_on_exit and infra is not None: - self.logger.debug("cleaning up Infrastructure instance created in run()") - infra.close() + if not infra_machine_content.item.facts.has_service_items: + self.debug(" machine has no user controlled services, skipping") + continue + + user_service_machine_vertex = self.dag.get_vertex(infra_machine_content.record_uid) + + # If the resource does not exist in the user service graph, add a vertex and link it to the + # user service root/configuration vertex. + if user_service_machine_vertex is None: + user_service_machine_vertex = self.dag.add_vertex(uid=infra_machine_content.record_uid, + name=infra_machine_content.name) + + # If the UserService resource vertex is not connect to root, connect it. + if not self.dag.get_root.has(user_service_machine_vertex): + user_service_machine_vertex.belongs_to_root(EdgeType.LINK) + + self.debug("-" * 40) + self._connect_users_to_services( + infra=infra, + infra_machine_content=infra_machine_content, + infra_machine_vertex=infra_machine_vertex, + record_linking=record_linking, + record_lookup_func=record_lookup_func) + self.debug("-" * 40) + + # Disconnect any users not used. + # TODO - Handle this better. + # If a machine is off, or we cannot connect, we might disconnect users. + # This needs more testing. + # self._cleanup_users() + + self.save() + + except Exception as err: + self.logger.error(f"could not map users to services: {err}") + raise err + + finally: + if created_infra: + infra.close() + if created_record_linking: + record_linking.close() diff --git a/keepercommander/discovery_common/utils.py b/keepercommander/discovery_common/utils.py index 0a5c7db92..5d8b06d0d 100644 --- a/keepercommander/discovery_common/utils.py +++ b/keepercommander/discovery_common/utils.py @@ -4,7 +4,7 @@ from .types import DiscoveryObject from ..keeper_dag.vertex import DAGVertex from .__version__ import __version__ -from typing import List, Optional, Tuple, TYPE_CHECKING +from typing import Optional, Tuple, TYPE_CHECKING if TYPE_CHECKING: from ..keeper_dag.dag import DAG @@ -65,62 +65,28 @@ def get_connection(**kwargs): def split_user_and_domain(user: str) -> Tuple[Optional[str], Optional[str]]: + """ + If the username is a UPN, email, netbios\\username, break it apart into user and domain/netbios. + """ + if user is None: return None, None domain = None - if "\\" in user: - user_parts = user.split("\\", maxsplit=1) + if "@" in user: + user_parts = user.split("@", maxsplit=1) user = user_parts[0] + if "\\" in user: + _, user = user.split("\\") domain = user_parts[1] - elif "@" in user: - user_parts = user.split("@") - domain = user_parts.pop() - user = "@".join(user_parts) + elif "\\" in user: + user_parts = user.split("\\", maxsplit=1) + user = user_parts[1].replace("\\", "") + domain = user_parts[0] return user, domain - -def user_check_list(user: str, name: Optional[str] = None, source: Optional[str] = None) -> List[str]: - user, domain = split_user_and_domain(user) - user = user.lower() - - # TODO: Add boolean for tasks to include `local users` patterns. - # It appears that for task lists, directory users do not have domains. - # A problem could arise where the customer uses a local user and directory with the same name. - check_list = [user, f".\\{user}"] - if name is not None: - name = name.lower() - check_list += [name, f".\\{name}"] - if source is not None: - source = source.lower() - check_list.append(f"{source[:15]}\\{user}") - check_list.append(f"{user}@{source}") - netbios_parts = source.split(".") - if len(netbios_parts) > 1: - check_list.append(f"{netbios_parts[0][:15]}\\{user}") - check_list.append(f"{user}@{netbios_parts[0]}") - if domain is not None: - domain = domain.lower() - check_list.append(f"{domain[:15]}\\{user}") - check_list.append(f"{user}@{domain}") - domain_parts = domain.split(".") - if len(domain_parts) > 1: - check_list.append(f"{domain_parts[0][:15]}\\{user}") - check_list.append(f"{user}@{domain_parts[0]}") - - return list(set(check_list)) - - -def user_in_lookup(user: str, lookup: dict, name: Optional[str] = None, source: Optional[str] = None) -> bool: - - for check_user in user_check_list(user, name, source): - if check_user in lookup: - return True - return False - - def find_user_vertex(graph: DAG, user: str, domain: Optional[str] = None) -> Optional[DAGVertex]: user_vertices = graph.search_content({"record_type": PAM_USER}) diff --git a/keepercommander/keeper_dag/__version__.py b/keepercommander/keeper_dag/__version__.py index 394531931..a98f9837e 100644 --- a/keepercommander/keeper_dag/__version__.py +++ b/keepercommander/keeper_dag/__version__.py @@ -1 +1 @@ -__version__ = '1.1.6' # pragma: no cover +__version__ = '1.1.9' # pragma: no cover diff --git a/keepercommander/keeper_dag/connection/local.py b/keepercommander/keeper_dag/connection/local.py index 96c008ff6..e8f2e79dc 100644 --- a/keepercommander/keeper_dag/connection/local.py +++ b/keepercommander/keeper_dag/connection/local.py @@ -86,6 +86,7 @@ def get_key_bytes(record: object) -> bytes: def clear_database(self): try: + print(f"remove DAG file as {self.db_file}") os.unlink(self.db_file) except (Exception,): pass @@ -195,6 +196,8 @@ def _find_stream_id(self, payload: DataPayload): # First check if we can route with existing edges in the database. stream_id = None + if not os.path.exists(self.db_file): + raise Exception(f"Cannot find local DAG as {self.db_file}") with closing(sqlite3.connect(self.db_file)) as connection: with closing(connection.cursor()) as cursor: diff --git a/keepercommander/keeper_dag/dag.py b/keepercommander/keeper_dag/dag.py index a2d6ddfed..81a75339c 100644 --- a/keepercommander/keeper_dag/dag.py +++ b/keepercommander/keeper_dag/dag.py @@ -93,8 +93,7 @@ def __init__(self, if logger is None: logger = logging.getLogger() self.logger = logger - if debug_level is None: - debug_level = int(os.environ.get("GS_DEBUG_LEVEL", os.environ.get("DAG_DEBUG_LEVEL", 0))) + self.debug_level = int(os.environ.get("GS_DEBUG_LEVEL", os.environ.get("DAG_DEBUG_LEVEL", debug_level))) # Prevent duplicate edges to be added. # The goal is to prevent unneeded edges. @@ -106,7 +105,6 @@ def __init__(self, raise Exception("Cannot run dedup_edge and auto_save at the same time. The dedup_edge feature only works " "in bulk saves.") - self.debug_level = debug_level self.log_prefix = log_prefix if save_batch_count is None or save_batch_count <= 0: @@ -1105,8 +1103,8 @@ def _add_data(vertex): self.debug(f"{data.ref.value} -> {data.parentRef.value} ({data.type})") self.debug("##############################################") - self.debug(f"total list has {len(data_list)} items", level=0) - self.debug(f"batch {self.save_batch_count} edges", level=0) + self.debug(f"total list has {len(data_list)} items", level=1) + self.debug(f"batch {self.save_batch_count} edges", level=1) batch_num = 0 while len(data_list) > 0: @@ -1126,7 +1124,7 @@ def _add_data(vertex): if len(batch_list) == 0: break - self.debug(f"adding {len(batch_list)} edges, batch {batch_num}", level=0) + self.debug(f"adding {len(batch_list)} edges, batch {batch_num}", level=1) payload = self.write_struct_obj.payload( origin_ref=self.write_struct_obj.origin_ref( diff --git a/keepercommander/keeper_dag/edge.py b/keepercommander/keeper_dag/edge.py index eae576121..ccc0faaa1 100644 --- a/keepercommander/keeper_dag/edge.py +++ b/keepercommander/keeper_dag/edge.py @@ -3,14 +3,15 @@ from .types import EdgeType from .exceptions import DAGContentException import json -from typing import Optional, Union, Any, TYPE_CHECKING +from typing import Optional, Union, Any, TYPE_CHECKING, TypeVar, Type if TYPE_CHECKING: # pragma: no cover from .vertex import DAGVertex Content = Union[str, bytes, dict] QueryValue = Union[list, dict, str, float, int, bool] - import pydantic - from pydantic import BaseModel + + +T = TypeVar('T') class DAGEdge: @@ -159,8 +160,7 @@ def content_as_str(self) -> Optional[str]: pass return content - def content_as_object(self, - meta_class: pydantic._internal._model_construction.ModelMetaclass) -> Optional[BaseModel]: + def content_as_object(self, meta_class: Type[T]) -> Optional[T]: """ Get the content as a pydantic based object. diff --git a/keepercommander/keeper_dag/vertex.py b/keepercommander/keeper_dag/vertex.py index 989bdf2c2..48c45d0c8 100644 --- a/keepercommander/keeper_dag/vertex.py +++ b/keepercommander/keeper_dag/vertex.py @@ -3,14 +3,15 @@ from .types import EdgeType, RefType from .crypto import generate_random_bytes, generate_uid_str, urlsafe_str_to_bytes from .exceptions import DAGDeletionException, DAGIllegalEdgeException, DAGVertexException, DAGKeyException -from typing import Optional, Union, List, Any, Tuple, TYPE_CHECKING +from typing import Optional, Union, List, Any, Tuple, TYPE_CHECKING, TypeVar, Type if TYPE_CHECKING: from .dag import DAG Content = Union[str, bytes, dict] QueryValue = Union[list, dict, str, float, int, bool] - import pydantic - from pydantic import BaseModel + + +T = TypeVar('T') class DAGVertex: @@ -489,8 +490,7 @@ def content_as_str(self) -> Optional[str]: return None return data_edge.content_as_str - def content_as_object(self, - meta_class: pydantic._internal._model_construction.ModelMetaclass) -> Optional[BaseModel]: + def content_as_object(self, meta_class: Type[T]) -> Optional[T]: """ Get the content as a pydantic based object. @@ -798,7 +798,7 @@ def _delete(vertex, prior_vertex): self.debug(f" * vertex is root, cannot delete root", level=2) return - self.debug(f"> checking vertex {vertex.uid}") + self.debug(f"> checking vertex {vertex.uid}", level=1) # Should we ignore a vertex? # If deleting an edge, we want to ignore the vertex that owns the edge. @@ -821,7 +821,7 @@ def _delete(vertex, prior_vertex): if e.edge_type != EdgeType.DATA and (prior_vertex is None or e.head_uid == prior_vertex.uid): e.delete() if vertex.belongs_to_a_vertex is False: - self.debug(f" * inactive vertex {vertex.uid}") + self.debug(f" * inactive vertex {vertex.uid}", level=1) vertex.active = False self.debug(f"DELETING vertex {self.uid}", level=3)