diff --git a/docs/PERMISSION_MGMT.md b/docs/PERMISSION_MGMT.md index 6fba02a..c74eb51 100644 --- a/docs/PERMISSION_MGMT.md +++ b/docs/PERMISSION_MGMT.md @@ -1,34 +1,41 @@ # GD Workspace Permission Management + Tool which helps manage user/userGroup bound workspace permissions within GoodData organization. Goal of the tool is to help manage state of the user-workspace or userGroup-workspace permission pairs in a granular fashion (one input row per each permission - e.g. `user_1 - ws_id_1 - "ANALYZE"`). - ## Usage The tool requires the following argument on input: + - `perm_csv` - a path to a csv file defining workspace permissions bound to specific ws_id-user or ws_id-userGroup pairs and the permissions isActive state Some other, _optional_, arguments are: + - `-d | --delimiter` - column delimiter for the csv files. Use this to define how the csv is parsed. Default value is "`,`" Use the tool like so: + ```sh python scripts/permission_mgmt.py perm_csv ``` + Where `perm_csv` refers to input csv. If you would like to define custom delimiter, use the tool like so: + ```sh python scripts/permission_mgmt.py perm_csv -d "," ``` To show the help for using arguments, call: + ```sh python scripts/permission_mgmt.py -h ``` ## Input CSV file (perm_csv) + The input CSV file defines the workspace permissions which you might want to manage. [Example input csv.](examples/permission_mgmt/input.csv) @@ -36,7 +43,7 @@ The input CSV file defines the workspace permissions which you might want to man Following format of the csv is expected: | user_id | ug_id | ws_id | ws_permissions | is_active | -|---------|-------|---------|----------------|-----------| +| ------- | ----- | ------- | -------------- | --------- | | user_1 | | ws_id_1 | ANALYZE | True | | user_1 | | ws_id_1 | VIEW | False | | user_1 | | ws_id_2 | MANAGE | True | @@ -52,4 +59,4 @@ Here, each `user_id` is the ID of the user to manage, and `ug_id` is the ID of t The `ws_id` is the workspace ID that the permission is bound to. -Lastly, the `is_active` field contains information about whether the permission should or should not exist in the organization. The `is_active` field is case insensitive and considers `true` as the only value taken as positive. Any other value in this field is considered negative (e.g.: `blabla` would evaluate to `False`). +Lastly, the `is_active` field holds boolean values containing information about whether the permission should or should not exist in the organization. diff --git a/scripts/permission_mgmt.py b/scripts/permission_mgmt.py index 6275d3a..c4f97a6 100644 --- a/scripts/permission_mgmt.py +++ b/scripts/permission_mgmt.py @@ -1,29 +1,20 @@ # (C) 2025 GoodData Corporation import argparse -import csv import logging import os -import sys -from dataclasses import dataclass from pathlib import Path -from typing import Any, Iterator, Optional, TypeAlias -import gooddata_sdk as gd_sdk -from gooddata_api_client.exceptions import NotFoundException - -USER_TYPE = "user" -USER_GROUP_TYPE = "userGroup" - -PROFILES_FILE = "profiles.yaml" -PROFILES_DIRECTORY = ".gooddata" -PROFILES_FILE_PATH = Path.home() / PROFILES_DIRECTORY / PROFILES_FILE -LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s" - -logger = logging.getLogger(__name__) -handler = logging.StreamHandler(sys.stdout) -handler.setFormatter(logging.Formatter(fmt=LOG_FORMAT)) -logger.addHandler(handler) -logger.setLevel(logging.INFO) +from gooddata_pipelines import ( + EntityType, + PermissionIncrementalLoad, + PermissionProvisioner, +) +from gooddata_sdk.utils import PROFILES_FILE_PATH +from utils.logger import setup_logging # type: ignore[import] +from utils.utils import ( # type: ignore[import] + create_provisioner, + read_csv_file_to_dict, +) def create_parser() -> argparse.ArgumentParser: @@ -63,384 +54,51 @@ def create_parser() -> argparse.ArgumentParser: return parser -TargetsPermissionDict: TypeAlias = dict[str, dict[str, bool]] - - -@dataclass(frozen=True) -class WSPermission: - permission: str - ws_id: str - id: str - type: str - is_active: bool - - @classmethod - def from_csv_row(cls, row: list[Any]) -> "WSPermission": - """Construct WSPermission data object from csv row input.""" - user_id, user_group_id, ws_id, permission, is_active = row - - id = user_id if user_id else user_group_id - target_type = USER_TYPE if user_id else USER_GROUP_TYPE - - return WSPermission( - permission=permission, - ws_id=ws_id, - id=id, - type=target_type, - is_active=str(is_active).lower() == "true", - ) - - -@dataclass -class WSPermissionDeclaration: - users: TargetsPermissionDict - user_groups: TargetsPermissionDict - - @classmethod - def from_sdk_api( - cls, declaration: gd_sdk.CatalogDeclarativeWorkspacePermissions - ) -> "WSPermissionDeclaration": - """ - Constructs an WSPermissionDeclaration instance - from GoodData SDK CatalogDeclarativeWorkspacePermissions. - """ - users: TargetsPermissionDict = {} - user_groups: TargetsPermissionDict = {} - - for permission in declaration.permissions: - permission_type, id = permission.assignee.type, permission.assignee.id - target_dict = users if permission_type == USER_TYPE else user_groups - - id_permissions = target_dict.get(id) - if not id_permissions: - target_dict[id] = dict() - - target_dict[id][permission.name] = True - - return WSPermissionDeclaration(users, user_groups) +def read_permissions_from_csv( + args: argparse.Namespace, +) -> list[PermissionIncrementalLoad]: + """Reads permissions from the input csv file.""" + validated_permissions: list[PermissionIncrementalLoad] = [] + raw_permissions = read_csv_file_to_dict(args.perm_csv, args.delimiter) - @staticmethod - def _construct_upstream_permission( - permission: str, assignee: gd_sdk.CatalogAssigneeIdentifier - ) -> gd_sdk.CatalogDeclarativeSingleWorkspacePermission | None: - """Constructs single permission declaration for the SDK API.""" + for raw_permission in raw_permissions: try: - return gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name=permission, assignee=assignee - ) - except Exception as e: - logger.error( - "Failed to construct SDK declaration " - f'for type={assignee.type} id={assignee.id}. Error: "{e}".' - ) - return None - - def _permissions_for_target( - self, permissions: dict[str, bool], assignee: gd_sdk.CatalogAssigneeIdentifier - ) -> Iterator[gd_sdk.CatalogDeclarativeSingleWorkspacePermission]: - """Constructs permission declarations for a single target.""" - for permission, is_active in permissions.items(): - if not is_active: - continue - declaration = self._construct_upstream_permission(permission, assignee) - if not declaration: - continue - yield declaration - - def to_sdk_api(self) -> gd_sdk.CatalogDeclarativeWorkspacePermissions: - """ - Constructs the GoodData SDK CatalogDeclarativeWorkspacePermissions - object from the WSPermissionDeclaration instance. - """ - permission_declarations: list[ - gd_sdk.CatalogDeclarativeSingleWorkspacePermission - ] = [] - - for user_id, permissions in self.users.items(): - assignee = gd_sdk.CatalogAssigneeIdentifier(id=user_id, type=USER_TYPE) - for declaration in self._permissions_for_target(permissions, assignee): - permission_declarations.append(declaration) - - for ug_id, permissions in self.user_groups.items(): - assignee = gd_sdk.CatalogAssigneeIdentifier(id=ug_id, type=USER_GROUP_TYPE) - for declaration in self._permissions_for_target(permissions, assignee): - permission_declarations.append(declaration) - - return gd_sdk.CatalogDeclarativeWorkspacePermissions( - permissions=permission_declarations - ) - - def add_permission(self, permission: WSPermission): - """ - Adds WSPermission object into respective field within the instance. - Handles duplicate permissions and different combinations of input - and upstream is_active permission states. - """ - target_dict = self.users if permission.type == USER_TYPE else self.user_groups - - if permission.id not in target_dict: - target_dict[permission.id] = {} - - is_active = permission.is_active - target_permissions = target_dict[permission.id] - permission_value = permission.permission - - if permission_value not in target_permissions: - target_permissions[permission_value] = is_active - elif not is_active and target_permissions[permission_value] is True: - logger.warning( - "isActive=False provided after True has been specificed " - f'for the same input. Skipping "{permission}".' - ) - elif is_active and target_permissions[permission_value] is False: - logger.warning( - "isActive=True provided after False has been specified " - f'for the same input. Overwriting "{permission}".' - ) - target_permissions[permission_value] = is_active - - def upsert(self, other: "WSPermissionDeclaration"): - """ - Modifies the owner object by merging with the other. - Keeps the unmodified users/userGroups untouched. - If some user/userGroup is modified, it gets overwritten with permissions - defined in the input. - """ - for user_id, permissions in other.users.items(): - self.users[user_id] = permissions - - for ug_id, permissions in other.user_groups.items(): - self.user_groups[ug_id] = permissions - - -WSPermissionsDeclarations: TypeAlias = dict[str, WSPermissionDeclaration] - - -class InvalidPermissionException(Exception): - pass - - -class WSPermissionManager: - def __init__(self, sdk: gd_sdk.GoodDataSdk): - self._sdk = sdk - - def _get_ws_declaration(self, ws_id: str) -> WSPermissionDeclaration: - users: TargetsPermissionDict = {} - user_groups: TargetsPermissionDict = {} - - upstream_declaration = self._sdk.catalog_permission.get_declarative_permissions( - ws_id - ) - - for permission in upstream_declaration.permissions: - permission_type, id = permission.assignee.type, permission.assignee.id - target_dict = users if permission_type == USER_TYPE else user_groups - - id_permissions = target_dict.get(id) - if not id_permissions: - target_dict[id] = dict() - - target_dict[id][permission.name] = True + if raw_permission["user_id"] and raw_permission["ug_id"]: + raise RuntimeError( + "UserID and UserGroupID are mutually exclusive per csv row. " + f'Skipping following row: "{raw_permission}".' + ) - return WSPermissionDeclaration(users, user_groups) + entity_id = raw_permission["user_id"] or raw_permission["ug_id"] + if not entity_id: + raise RuntimeError( + "Either UserID or UserGroupID have to be defined per csv row. " + f'Skipping following row: "{raw_permission}".' + ) - def _get_upstream_declaration( - self, ws_id: str - ) -> Optional[WSPermissionDeclaration]: - """Retrieves upstream permission declaration for a workspace.""" - try: - declaration = self._sdk.catalog_permission.get_declarative_permissions( - ws_id + if raw_permission["user_id"]: + entity_type = EntityType.user + else: + entity_type = EntityType.user_group + + validated_permission = PermissionIncrementalLoad( + permission=raw_permission["ws_permissions"], + workspace_id=raw_permission["ws_id"], + entity_id=entity_id, + entity_type=entity_type, + is_active=raw_permission["is_active"], ) - return WSPermissionDeclaration.from_sdk_api(declaration) - except NotFoundException as e: - logger.error(f"Workspace with id {ws_id} doesn't exist. Error: {e}") + validated_permissions.append(validated_permission) + except KeyError as e: + logger.error(f"Missing key in following row: {raw_permission}. Error: {e}") + continue except Exception as e: logger.error( - "Some error occured while retrieving workspace " - f'permission declaration for workspace "{ws_id}". Error: "{e}"' + f'Unable to load following row: "{raw_permission}". Error: "{e}"' ) - return None + continue - def _get_upstream_declarations( - self, input_ws_ids: list[str] - ) -> WSPermissionsDeclarations: - """Retrieves upstream permission declarations for a list of workspaces.""" - ws_dict: WSPermissionsDeclarations = {} - for ws_id in input_ws_ids: - declaration = self._get_upstream_declaration(ws_id) - if declaration: - ws_dict[ws_id] = declaration - return ws_dict - - @staticmethod - def _construct_declarations( - permissions: list[WSPermission], - ) -> WSPermissionsDeclarations: - """Constructs workspace permission declarations from the input permissions.""" - ws_dict: WSPermissionsDeclarations = {} - for permission in permissions: - ws_id = permission.ws_id - - if ws_id not in ws_dict: - ws_dict[ws_id] = WSPermissionDeclaration({}, {}) - - ws_dict[ws_id].add_permission(permission) - return ws_dict - - def _check_user_exists(self, user_id: str): - """Checks if user with provided ID exists.""" - try: - self._sdk.catalog_user.get_user(user_id) - except NotFoundException: - raise InvalidPermissionException("Provided user ID does not exist.") - - def _check_user_group_exists(self, ug_id: str): - """Checks if user group with provided ID exists.""" - try: - self._sdk.catalog_user.get_user_group(ug_id) - except NotFoundException: - raise InvalidPermissionException("Provided user group ID does not exist.") - - def _validate_permission(self, permission: WSPermission): - """Validates if the permission is correctly defined.""" - if permission.type == USER_TYPE: - self._check_user_exists(permission.id) - else: - self._check_user_group_exists(permission.id) - - def _filter_invalid_permissions( - self, permissions: list[WSPermission] - ) -> list[WSPermission]: - """Filters out invalid permissions from the input list.""" - valid_permissions: list[WSPermission] = [] - for permission in permissions: - try: - self._validate_permission(permission) - except InvalidPermissionException as e: - logger.error( - f'Invalid permission defined. Skipping "{permission}. Error: "{e}".' - ) - continue - valid_permissions.append(permission) - return valid_permissions - - def manage_permissions(self, permissions: list[WSPermission]): - """Manages permissions for a list of workspaces. - Modify upstream workspace declarations for each input workspace and skip non-existent ws_ids - """ - logger.info( - f"Starting permission management run of {len(permissions)} permissions..." - ) - valid_permissions = self._filter_invalid_permissions(permissions) - - input_declarations = self._construct_declarations(valid_permissions) - - input_ws_ids = list(input_declarations.keys()) - upstream_declarations = self._get_upstream_declarations(input_ws_ids) - - for ws_id, declaration in input_declarations.items(): - if ws_id not in upstream_declarations: - continue - - upstream_declarations[ws_id].upsert(declaration) - - ws_permissions = upstream_declarations[ws_id].to_sdk_api() - - logger.info(f'Putting declarative permissions for workspace "{ws_id}".') - try: - self._sdk.catalog_permission.put_declarative_permissions( - ws_id, ws_permissions - ) - except Exception as e: - logger.error( - "Failed to update declarative workspace " - f'permissions for workspace "{ws_id}". Error: {e}' - ) - logger.info("Finished permission management run.") - - -def csv_row_is_valid(row: list[Any]) -> bool: - """Validates if the csv row is correctly defined.""" - try: - user_id, user_group_id, ws_id, permission, is_active = row - except Exception as e: - logger.error( - "Unable to parse csv row. " - "Most probably an incorrect amount of values was defined. " - f'Skipping following row: "{row}". Error: "{e}".' - ) - return False - - if user_id and user_group_id: - logger.error( - "UserID and UserGroupID are mutually exclusive per csv row. " - f'Skipping following row: "{row}".' - ) - return False - - if not user_id and not user_group_id: - logger.error( - "Either UserID or UserGroupID have to be defined per csv row. " - f'Skipping following row: "{row}".' - ) - return False - - if not ws_id: - logger.error(f'ws_id field seems to be empty. Skipping following row: "{row}".') - return False - - if not permission: - logger.error( - f'permission field seems to be empty. Skipping following row: "{row}".' - ) - return False - - if not is_active: - logger.error( - f'is_active field seems to be empty. Skipping following row: "{row}".' - ) - return False - - return True - - -def read_permissions_from_csv(csv_path: str) -> list[WSPermission]: - """Reads permissions from the input csv file.""" - permissions: list[WSPermission] = [] - with open(csv_path, "r") as f: - reader = csv.reader(f, skipinitialspace=True) - next(reader) # Skip header - for row in reader: - if not csv_row_is_valid(row): - continue - try: - permission = WSPermission.from_csv_row(row) - except Exception as e: - logger.error(f'Unable to load following row: "{row}". Error: "{e}"') - continue - permissions.append(permission) - return permissions - - -def create_client(args: argparse.Namespace) -> gd_sdk.GoodDataSdk: - """Creates GoodData SDK client based on the input arguments.""" - gdc_auth_token = os.environ.get("GDC_AUTH_TOKEN") - gdc_hostname = os.environ.get("GDC_HOSTNAME") - - if gdc_hostname and gdc_auth_token: - logger.info("Using GDC_HOSTNAME and GDC_AUTH_TOKEN envvars.") - return gd_sdk.GoodDataSdk.create(gdc_hostname, gdc_auth_token) - - profile_config, profile = args.profile_config, args.profile - if os.path.exists(profile_config): - logger.info(f"Using GoodData profile {profile} sourced from {profile_config}.") - return gd_sdk.GoodDataSdk.create_from_profile(profile, profile_config) - - raise RuntimeError( - "No GoodData credentials provided. Please export required ENVVARS " - "(GDC_HOSTNAME, GDC_AUTH_TOKEN) or provide path to GD profile config." - ) + return validated_permissions def validate_args(args: argparse.Namespace) -> None: @@ -451,16 +109,19 @@ def validate_args(args: argparse.Namespace) -> None: ) -def permission_mgmt(args): - """Main function for the permission management script.""" - validate_args(args) - permissions = read_permissions_from_csv(args.perm_csv) - sdk = create_client(args) - permission_manager = WSPermissionManager(sdk) - permission_manager.manage_permissions(permissions) - - if __name__ == "__main__": parser = create_parser() args = parser.parse_args() - permission_mgmt(args) + + setup_logging(args.verbose) + logger = logging.getLogger(__name__) + + permissions = read_permissions_from_csv(args) + + permission_manager = create_provisioner( + PermissionProvisioner, args.profile_config, args.profile + ) + + permission_manager.logger.subscribe(logger) + + permission_manager.incremental_load(permissions) diff --git a/scripts/utils/logger.py b/scripts/utils/logger.py index 0e670fc..490f7ee 100644 --- a/scripts/utils/logger.py +++ b/scripts/utils/logger.py @@ -58,7 +58,7 @@ def emit(self, record: logging.LogRecord) -> None: self.file_handler.emit(record) -def setup_logging() -> None: +def setup_logging(verbose: bool = False) -> None: """ Sets up logging configuration for the root logger. Terminal logs will be formatted with colors based on the log level. @@ -66,6 +66,9 @@ def setup_logging() -> None: `_.log` in the current working directory. """ root_logger = logging.getLogger() - root_logger.setLevel(logging.INFO) + + min_level = logging.DEBUG if verbose else logging.INFO + + root_logger.setLevel(min_level) root_logger.handlers.clear() root_logger.addHandler(LogHandler()) diff --git a/tests/test_permissions.py b/tests/test_permissions.py deleted file mode 100644 index 4d50c5f..0000000 --- a/tests/test_permissions.py +++ /dev/null @@ -1,373 +0,0 @@ -# (C) 2025 GoodData Corporation -import argparse -from unittest import mock - -import gooddata_sdk as gd_sdk -from gooddata_api_client.exceptions import NotFoundException - -from scripts import permission_mgmt - -TEST_CSV_PATH = "tests/data/permission_mgmt/input.csv" - -USER_1 = gd_sdk.CatalogAssigneeIdentifier(id="user_1", type="user") -USER_2 = gd_sdk.CatalogAssigneeIdentifier(id="user_2", type="user") -USER_3 = gd_sdk.CatalogAssigneeIdentifier(id="user_3", type="user") -UG_1 = gd_sdk.CatalogAssigneeIdentifier(id="ug_1", type="userGroup") -UG_2 = gd_sdk.CatalogAssigneeIdentifier(id="ug_2", type="userGroup") -UG_3 = gd_sdk.CatalogAssigneeIdentifier(id="ug_3", type="userGroup") - -UPSTREAM_PERMISSIONS = [ - gd_sdk.CatalogDeclarativeSingleWorkspacePermission(name="ANALYZE", assignee=USER_1), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission(name="VIEW", assignee=USER_1), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission(name="MANAGE", assignee=USER_1), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission(name="ANALYZE", assignee=USER_2), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission(name="VIEW", assignee=USER_2), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission(name="ANALYZE", assignee=USER_3), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission(name="ANALYZE", assignee=UG_1), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission(name="VIEW", assignee=UG_1), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission(name="MANAGE", assignee=UG_1), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission(name="ANALYZE", assignee=UG_2), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission(name="VIEW", assignee=UG_2), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission(name="ANALYZE", assignee=UG_3), -] - -WS_PERMISSION_DECLARATION = permission_mgmt.WSPermissionDeclaration( - users={ - "user_1": {"ANALYZE": True, "VIEW": True, "MANAGE": True}, - "user_2": {"ANALYZE": True, "VIEW": True}, - "user_3": {"ANALYZE": True}, - }, - user_groups={ - "ug_1": {"ANALYZE": True, "VIEW": True, "MANAGE": True}, - "ug_2": {"ANALYZE": True, "VIEW": True}, - "ug_3": {"ANALYZE": True}, - }, -) - -UPSTREAM_WS_PERMISSION = gd_sdk.CatalogDeclarativeWorkspacePermissions( - permissions=UPSTREAM_PERMISSIONS -) - -UPSTREAM_WS_PERMISSIONS = { - "ws_id_1": UPSTREAM_WS_PERMISSION, - "ws_id_2": UPSTREAM_WS_PERMISSION, -} - -EXPECTED_WS1_PERMISSIONS = gd_sdk.CatalogDeclarativeWorkspacePermissions( - permissions=[ - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="ANALYZE", assignee=USER_1 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="VIEW", assignee=USER_1 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="ANALYZE", assignee=USER_2 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="MANAGE", assignee=USER_2 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="ANALYZE", assignee=USER_3 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="ANALYZE", assignee=UG_1 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission(name="VIEW", assignee=UG_1), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="ANALYZE", assignee=UG_2 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="MANAGE", assignee=UG_2 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="ANALYZE", assignee=UG_3 - ), - ] -) - -EXPECTED_WS2_PERMISSIONS = gd_sdk.CatalogDeclarativeWorkspacePermissions( - permissions=[ - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="MANAGE", assignee=USER_1 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="MANAGE", assignee=USER_3 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="MANAGE", assignee=UG_1 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="MANAGE", assignee=UG_3 - ), - ] -) - - -def test_declaration_from_populated_sdk_api_obj(): - declaration = permission_mgmt.WSPermissionDeclaration.from_sdk_api( - UPSTREAM_WS_PERMISSION - ) - assert declaration == WS_PERMISSION_DECLARATION - - -def test_declaration_from_empty_sdk_api_obj(): - api_obj = gd_sdk.CatalogDeclarativeWorkspacePermissions(permissions=[]) - declaration = permission_mgmt.WSPermissionDeclaration.from_sdk_api(api_obj) - assert len(declaration.users) == 0 - assert len(declaration.user_groups) == 0 - - -def test_declaration_to_populated_sdk_api_obj(): - api_obj = permission_mgmt.WSPermissionDeclaration.to_sdk_api( - WS_PERMISSION_DECLARATION - ) - assert api_obj == UPSTREAM_WS_PERMISSION - - -def test_declaration_with_inactive_to_sdk_api_obj(): - users = { - "user_1": {"ANALYZE": True, "VIEW": False}, - "user_2": {"ANALYZE": True}, - } - ugs = { - "ug_1": {"ANALYZE": True, "VIEW": False}, - "ug_2": {"ANALYZE": True}, - } - declaration = permission_mgmt.WSPermissionDeclaration(users, ugs) - api_obj = declaration.to_sdk_api() - expected = gd_sdk.CatalogDeclarativeWorkspacePermissions( - permissions=[ - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="ANALYZE", assignee=USER_1 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="ANALYZE", assignee=USER_2 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="ANALYZE", assignee=UG_1 - ), - gd_sdk.CatalogDeclarativeSingleWorkspacePermission( - name="ANALYZE", assignee=UG_2 - ), - ] - ) - assert api_obj == expected - - -def test_declaration_with_only_inactive_to_sdk_api_obj(): - users = { - "user_1": {"ANALYZE": False, "VIEW": False}, - "user_2": {"ANALYZE": False}, - } - ugs = { - "ug_1": {"ANALYZE": False, "VIEW": False}, - "ug_2": {"ANALYZE": False}, - } - declaration = permission_mgmt.WSPermissionDeclaration(users, ugs) - api_obj = declaration.to_sdk_api() - expected = gd_sdk.CatalogDeclarativeWorkspacePermissions(permissions=[]) - assert api_obj == expected - - -# Declarations are explicitly defined anew here to avoid dict mutations -# in subsequent calls and to avoid dict deepcopy overhead. - - -def test_add_new_active_user_perm(): - declaration = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"ANALYZE": True, "VIEW": False}}, - {"ug_1": {"VIEW": True, "ANALYZE": False}}, - ) - permission = permission_mgmt.WSPermission("MANAGE", "", "user_1", "user", True) - declaration.add_permission(permission) - assert declaration.users == { - "user_1": {"ANALYZE": True, "VIEW": False, "MANAGE": True} - } - assert declaration.user_groups == {"ug_1": {"VIEW": True, "ANALYZE": False}} - - -def test_add_new_inactive_user_perm(): - declaration = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"ANALYZE": True, "VIEW": False}}, - {"ug_1": {"VIEW": True, "ANALYZE": False}}, - ) - permission = permission_mgmt.WSPermission("MANAGE", "", "user_1", "user", False) - declaration.add_permission(permission) - assert declaration.users == { - "user_1": {"ANALYZE": True, "VIEW": False, "MANAGE": False} - } - assert declaration.user_groups == {"ug_1": {"VIEW": True, "ANALYZE": False}} - - -def test_overwrite_inactive_user_perm(): - declaration = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"ANALYZE": True, "VIEW": False}}, - {"ug_1": {"VIEW": True, "ANALYZE": False}}, - ) - permission = permission_mgmt.WSPermission("VIEW", "", "user_1", "user", True) - declaration.add_permission(permission) - assert declaration.users == {"user_1": {"ANALYZE": True, "VIEW": True}} - assert declaration.user_groups == {"ug_1": {"VIEW": True, "ANALYZE": False}} - - -def test_overwrite_active_user_perm(): - declaration = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"ANALYZE": True, "VIEW": False}}, - {"ug_1": {"VIEW": True, "ANALYZE": False}}, - ) - permission = permission_mgmt.WSPermission("ANALYZE", "", "user_1", "user", False) - declaration.add_permission(permission) - assert declaration.users == {"user_1": {"ANALYZE": True, "VIEW": False}} - assert declaration.user_groups == {"ug_1": {"VIEW": True, "ANALYZE": False}} - - -def test_add_new_user_perm(): - declaration = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"ANALYZE": True, "VIEW": False}}, - {"ug_1": {"VIEW": True, "ANALYZE": False}}, - ) - permission = permission_mgmt.WSPermission("VIEW", "", "user_2", "user", True) - declaration.add_permission(permission) - assert declaration.users == { - "user_1": {"ANALYZE": True, "VIEW": False}, - "user_2": {"VIEW": True}, - } - assert declaration.user_groups == {"ug_1": {"VIEW": True, "ANALYZE": False}} - - -def test_modify_one_of_user_perms(): - declaration = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"ANALYZE": True, "VIEW": False}, "user_2": {"VIEW": True}}, - {"ug_1": {"VIEW": True, "ANALYZE": False}}, - ) - permission = permission_mgmt.WSPermission("MANAGE", "", "user_1", "user", True) - declaration.add_permission(permission) - assert declaration.users == { - "user_1": {"ANALYZE": True, "VIEW": False, "MANAGE": True}, - "user_2": {"VIEW": True}, - } - assert declaration.user_groups == {"ug_1": {"VIEW": True, "ANALYZE": False}} - - -# Add userGroup permission - - -def test_add_new_active_ug_perm(): - declaration = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"ANALYZE": True, "VIEW": False}}, - {"ug_1": {"VIEW": True, "ANALYZE": False}}, - ) - permission = permission_mgmt.WSPermission("MANAGE", "", "ug_1", "userGroup", True) - declaration.add_permission(permission) - assert declaration.users == {"user_1": {"ANALYZE": True, "VIEW": False}} - assert declaration.user_groups == { - "ug_1": {"VIEW": True, "ANALYZE": False, "MANAGE": True} - } - - -def test_add_new_inactive_ug_perm(): - declaration = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"ANALYZE": True, "VIEW": False}}, - {"ug_1": {"VIEW": True, "ANALYZE": False}}, - ) - permission = permission_mgmt.WSPermission("MANAGE", "", "ug_1", "userGroup", False) - declaration.add_permission(permission) - assert declaration.users == {"user_1": {"ANALYZE": True, "VIEW": False}} - assert declaration.user_groups == { - "ug_1": {"VIEW": True, "ANALYZE": False, "MANAGE": False} - } - - -def test_overwrite_inactive_ug_perm(): - declaration = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"ANALYZE": True, "VIEW": False}}, - {"ug_1": {"VIEW": True, "ANALYZE": False}}, - ) - permission = permission_mgmt.WSPermission("ANALYZE", "", "ug_1", "userGroup", True) - declaration.add_permission(permission) - assert declaration.users == {"user_1": {"ANALYZE": True, "VIEW": False}} - assert declaration.user_groups == {"ug_1": {"VIEW": True, "ANALYZE": True}} - - -def test_overwrite_active_ug_perm(): - declaration = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"ANALYZE": True, "VIEW": False}}, - {"ug_1": {"VIEW": True, "ANALYZE": False}}, - ) - permission = permission_mgmt.WSPermission("VIEW", "", "ug_1", "userGroup", False) - declaration.add_permission(permission) - assert declaration.users == {"user_1": {"ANALYZE": True, "VIEW": False}} - assert declaration.user_groups == {"ug_1": {"VIEW": True, "ANALYZE": False}} - - -def test_add_new_ug_perm(): - declaration = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"ANALYZE": True, "VIEW": False}}, - {"ug_1": {"VIEW": True, "ANALYZE": False}}, - ) - permission = permission_mgmt.WSPermission("VIEW", "", "ug_2", "userGroup", True) - declaration.add_permission(permission) - assert declaration.users == {"user_1": {"ANALYZE": True, "VIEW": False}} - assert declaration.user_groups == { - "ug_1": {"VIEW": True, "ANALYZE": False}, - "ug_2": {"VIEW": True}, - } - - -def test_modify_one_of_ug_perms(): - declaration = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"ANALYZE": True, "VIEW": False}}, - {"ug_1": {"VIEW": True, "ANALYZE": False}, "ug_2": {"VIEW": True}}, - ) - permission = permission_mgmt.WSPermission("MANAGE", "", "ug_1", "userGroup", True) - declaration.add_permission(permission) - assert declaration.users == {"user_1": {"ANALYZE": True, "VIEW": False}} - assert declaration.user_groups == { - "ug_1": {"VIEW": True, "ANALYZE": False, "MANAGE": True}, - "ug_2": {"VIEW": True}, - } - - -def test_upsert(): - owner = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"ANALYZE": True}, "user_2": {"VIEW": True}}, - {"ug_1": {"ANALYZE": True}, "ug_2": {"VIEW": True}}, - ) - other = permission_mgmt.WSPermissionDeclaration( - {"user_1": {"MANAGE": True, "VIEW": False}}, - {"ug_2": {"MANAGE": True, "VIEW": False}}, - ) - owner.upsert(other) - assert owner.users == { - "user_1": {"MANAGE": True, "VIEW": False}, - "user_2": {"VIEW": True}, - } - assert owner.user_groups == { - "ug_1": {"ANALYZE": True}, - "ug_2": {"MANAGE": True, "VIEW": False}, - } - - -def mock_upstream_perms(ws_id: str) -> gd_sdk.CatalogDeclarativeWorkspacePermissions: - if ws_id not in UPSTREAM_WS_PERMISSIONS: - raise NotFoundException(404) - return UPSTREAM_WS_PERMISSIONS[ws_id] - - -@mock.patch("scripts.permission_mgmt.create_client") -def test_permission_management_e2e(create_client): - sdk = mock.Mock() - sdk.catalog_permission.get_declarative_permissions.side_effect = mock_upstream_perms - create_client.return_value = sdk - - args = argparse.Namespace(perm_csv=TEST_CSV_PATH, verbose=False) - - permission_mgmt.permission_mgmt(args) - - sdk.catalog_permission.put_declarative_permissions.assert_has_calls( - [ - mock.call("ws_id_1", EXPECTED_WS1_PERMISSIONS), - mock.call("ws_id_2", EXPECTED_WS2_PERMISSIONS), - ] - )