From 4cdd27e514199c886c770df1e9b864e147fa74a9 Mon Sep 17 00:00:00 2001 From: janmatzek Date: Fri, 10 Oct 2025 14:12:33 +0200 Subject: [PATCH] feat: reuse custom fields implementation from gooddata-pipelines --- scripts/custom_fields.py | 66 ++-- scripts/custom_fields/__init__.py | 1 - scripts/custom_fields/api.py | 181 ----------- scripts/custom_fields/custom_field_manager.py | 240 --------------- scripts/custom_fields/input_processor.py | 281 ------------------ scripts/custom_fields/input_validator.py | 206 ------------- scripts/custom_fields/models/__init__.py | 1 - scripts/custom_fields/models/aliases.py | 10 - .../custom_fields/models/analytical_object.py | 33 -- .../models/custom_data_object.py | 88 ------ scripts/permission_mgmt.py | 4 +- scripts/user_data_filter_mgmt.py | 4 +- scripts/user_group_mgmt.py | 4 +- scripts/user_mgmt.py | 4 +- scripts/utils/utils.py | 30 +- scripts/workspace_mgmt.py | 6 +- .../response_get_all_dashboards.json | 72 ----- .../response_get_all_metrics.json | 78 ----- .../response_get_all_visualizations.json | 143 --------- tests/test_custom_fields/__init__.py | 1 - .../test_custom_fields/test_custom_fields.py | 150 ---------- .../test_input_processor.py | 175 ----------- .../test_input_validator.py | 203 ------------- .../test_models/__init__.py | 1 - .../test_models/test_analytical_object.py | 69 ----- .../test_models/test_custom_data_object.py | 107 ------- 26 files changed, 78 insertions(+), 2080 deletions(-) delete mode 100644 scripts/custom_fields/__init__.py delete mode 100644 scripts/custom_fields/api.py delete mode 100644 scripts/custom_fields/custom_field_manager.py delete mode 100644 scripts/custom_fields/input_processor.py delete mode 100644 scripts/custom_fields/input_validator.py delete mode 100644 scripts/custom_fields/models/__init__.py delete mode 100644 scripts/custom_fields/models/aliases.py delete mode 100644 scripts/custom_fields/models/analytical_object.py delete mode 100644 scripts/custom_fields/models/custom_data_object.py delete mode 100644 tests/data/custom_fields/response_get_all_dashboards.json delete mode 100644 tests/data/custom_fields/response_get_all_metrics.json delete mode 100644 tests/data/custom_fields/response_get_all_visualizations.json delete mode 100644 tests/test_custom_fields/__init__.py delete mode 100644 tests/test_custom_fields/test_custom_fields.py delete mode 100644 tests/test_custom_fields/test_input_processor.py delete mode 100644 tests/test_custom_fields/test_input_validator.py delete mode 100644 tests/test_custom_fields/test_models/__init__.py delete mode 100644 tests/test_custom_fields/test_models/test_analytical_object.py delete mode 100644 tests/test_custom_fields/test_models/test_custom_data_object.py diff --git a/scripts/custom_fields.py b/scripts/custom_fields.py index 5af43b7..fbeae25 100644 --- a/scripts/custom_fields.py +++ b/scripts/custom_fields.py @@ -6,46 +6,56 @@ """ import argparse -import os +from pathlib import Path -from custom_fields.custom_field_manager import ( # type: ignore[import] - CustomFieldManager, +from gooddata_pipelines import ( + CustomDatasetDefinition, + CustomFieldDefinition, + LdmExtensionManager, ) +from gooddata_sdk.utils import PROFILES_FILE_PATH from utils.logger import get_logger, setup_logging # type: ignore[import] -from utils.utils import read_csv_file_to_dict # type: ignore[import] +from utils.utils import ( # type: ignore[import] + create_client, + read_csv_file_to_dict, +) setup_logging() logger = get_logger(__name__) -def custome_fields() -> None: +def custom_fields() -> None: """Main function to run the custom fields script.""" - # Get host and token from environment variables - # TODO: add option to load credentials from profile - # TODO: (refactor) credentials should be handled in one place for the project - host = os.environ.get("GDC_HOSTNAME") - token = os.environ.get("GDC_AUTH_TOKEN") args: argparse.Namespace = parse_args() path_to_custom_datasets_csv = args.path_to_custom_datasets_csv path_to_custom_fields_csv = args.path_to_custom_fields_csv check_relations: bool = args.check_relations - if not host: - raise ValueError("GDC_HOSTNAME environment variable is not set.") - if not token: - raise ValueError("GDC_AUTH_TOKEN environment variable is not set.") - # Load input data from csv files - custom_datasets: list[dict[str, str]] = read_csv_file_to_dict( + raw_custom_datasets: list[dict[str, str]] = read_csv_file_to_dict( path_to_custom_datasets_csv ) - custom_fields: list[dict[str, str]] = read_csv_file_to_dict( + + custom_datasets = [ + CustomDatasetDefinition.model_validate(raw_custom_dataset) + for raw_custom_dataset in raw_custom_datasets + ] + + raw_custom_fields: list[dict[str, str]] = read_csv_file_to_dict( path_to_custom_fields_csv ) + custom_fields = [ + CustomFieldDefinition.model_validate(raw_custom_field) + for raw_custom_field in raw_custom_fields + ] + # Create instance of CustomFieldManager with host and token - manager = CustomFieldManager(host, token) + manager = create_client(LdmExtensionManager, args.profile_config, args.profile) + + # Subscribe to logs + manager.logger.subscribe(logger) # Process the custom datasets and fields manager.process(custom_datasets, custom_fields, check_relations) @@ -59,11 +69,13 @@ def parse_args(): type=str, help="Path to the CSV file containing custom datasets definitions.", ) + parser.add_argument( "path_to_custom_fields_csv", type=str, help="Path to the CSV file containing custom fields definitions.", ) + parser.add_argument( "--no-relations-check", action="store_false", @@ -73,8 +85,24 @@ def parse_args(): + "Boolean, defaults to True.", ) + parser.add_argument( + "-p", + "--profile-config", + type=Path, + default=PROFILES_FILE_PATH, + help="Optional path to GoodData profile config. " + f'If no path is provided, "{PROFILES_FILE_PATH}" is used.', + ) + + parser.add_argument( + "--profile", + type=str, + default="default", + help='GoodData profile to use. If no profile is provided, "default" is used.', + ) + return parser.parse_args() if __name__ == "__main__": - custome_fields() + custom_fields() diff --git a/scripts/custom_fields/__init__.py b/scripts/custom_fields/__init__.py deleted file mode 100644 index 37d863d..0000000 --- a/scripts/custom_fields/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# (C) 2025 GoodData Corporation diff --git a/scripts/custom_fields/api.py b/scripts/custom_fields/api.py deleted file mode 100644 index ecf9d8b..0000000 --- a/scripts/custom_fields/api.py +++ /dev/null @@ -1,181 +0,0 @@ -# (C) 2025 GoodData Corporation -import json -from typing import Any - -import requests -from gooddata_sdk.sdk import GoodDataSdk - -TIMEOUT = 60 -PANTHER_API_VERSION = "v1" - - -class GoodDataAPI: - headers: dict[str, str] - base_url: str - - def __init__(self, host: str, token: str) -> None: - """Initialize the GoodDataAPI with host and token. - - Args: - host (str): The GoodData Cloud host URL. - token (str): The authentication token for the GoodData Cloud API. - """ - self._domain: str = host - self._token: str = token - - # Initialize the GoodData SDK - self._sdk = GoodDataSdk.create(self._domain, self._token) - - # Set up utils for direct API interaction - self.base_url = self._get_base_url(self._domain) - self.headers = { - "Authorization": f"Bearer {self._token}", - "Content-Type": "application/vnd.gooddata.api+json", - } - - def get_workspace_layout(self, workspace_id: str) -> requests.Response: - """Get the layout of the specified workspace. - - Args: - workspace_id (str): The ID of the workspace to retrieve the layout for. - Returns: - requests.Response: The response containing the workspace layout. - """ - endpoint = f"/layout/workspaces/{workspace_id}" - return self._get(endpoint) - - def put_workspace_layout( - self, workspace_id: str, layout: dict[str, Any] - ) -> requests.Response: - """Update the layout of the specified workspace. - - Args: - workspace_id (str): The ID of the workspace to update. - layout (dict[str, Any]): The new layout to set for the workspace. - Returns: - requests.Response: The response from the server after updating the layout. - """ - endpoint = f"/layout/workspaces/{workspace_id}" - headers = {**self.headers, "Content-Type": "application/json"} - return self._put(endpoint, data=layout, headers=headers) - - def get_all_metrics(self, workspace_id: str) -> requests.Response: - """Get all metrics from the specified workspace. - - Args: - workspace_id (str): The ID of the workspace to retrieve metrics from. - Returns: - requests.Response: The response containing the metrics. - """ - endpoint = f"/entities/workspaces/{workspace_id}/metrics" - headers = {**self.headers, "X-GDC-VALIDATE-RELATIONS": "true"} - return self._get(endpoint, headers=headers) - - def get_all_visualization_objects(self, workspace_id: str) -> requests.Response: - """Get all visualizations from the specified workspace. - - Args: - workspace_id (str): The ID of the workspace to retrieve visualizations from. - Returns: - requests.Response: The response containing the visualizations. - """ - endpoint = f"/entities/workspaces/{workspace_id}/visualizationObjects" - headers = {**self.headers, "X-GDC-VALIDATE-RELATIONS": "true"} - return self._get(endpoint, headers=headers) - - def get_all_dashboards(self, workspace_id: str) -> requests.Response: - """Get all dashboards from the specified workspace. - - Args: - workspace_id (str): The ID of the workspace to retrieve dashboards from. - Returns: - requests.Response: The response containing the dashboards. - """ - endpoint = f"/entities/workspaces/{workspace_id}/analyticalDashboards" - headers = {**self.headers, "X-GDC-VALIDATE-RELATIONS": "true"} - return self._get(endpoint, headers=headers) - - @staticmethod - def _get_base_url(domain: str) -> str: - """Returns the root endpoint for the GoodData Cloud API. - - Method ensures that the URL starts with "https://" and does not - end with a trailing slash. - - Args: - domain (str): The domain of the GoodData Cloud instance. - Returns: - str: The base URL for the GoodData Cloud API. - """ - # Remove trailing slash if present. - if domain[-1] == "/": - domain = domain[:-1] - - if not domain.startswith("https://") and not domain.startswith("http://"): - domain = f"https://{domain}" - - if domain.startswith("http://") and not domain.startswith("https://"): - domain = domain.replace("http://", "https://") - - return f"{domain}/api/{PANTHER_API_VERSION}" - - def _get_url(self, endpoint: str) -> str: - """Returns the full URL for a given API endpoint. - - Args: - endpoint (str): The API endpoint to be appended to the base URL. - Returns: - str: The full URL for the API endpoint. - """ - return f"{self.base_url}{endpoint}" - - def _get( - self, endpoint: str, headers: dict[str, str] | None = None - ) -> requests.Response: - """Sends a GET request to the server. - - Args: - endpoint (str): The API endpoint to send the GET request to. - Returns: - requests.Response: The response from the server. - """ - url = self._get_url(endpoint) - request_headers = headers if headers else self.headers - - return requests.get(url, headers=request_headers, timeout=TIMEOUT) - - def _put( - self, - endpoint: str, - data: Any, - headers: dict | None = None, - ) -> requests.Response: - """Sends a PUT request to the server with a given JSON object. - - Args: - endpoint (str): The API endpoint to send the PUT request to. - data (Any): The JSON data to send in the request body. - headers (dict | None): Headers to include in the request. - If no headers are provided, the default headers will be used. - Returns: - requests.Response: The response from the server. - """ - url = self._get_url(endpoint) - request_headers = headers if headers else self.headers - data_json = json.dumps(data) - - return requests.put( - url, data=data_json, headers=request_headers, timeout=TIMEOUT - ) - - @staticmethod - def raise_if_response_not_ok(*responses: requests.Response) -> None: - """Check if responses from API calls are OK. - - Raises ValueError if any response is not OK (status code not 2xx). - """ - for response in responses: - if not response.ok: - raise ValueError( - f"Request to {response.url} failed with status code {response.status_code}: {response.text}" - ) diff --git a/scripts/custom_fields/custom_field_manager.py b/scripts/custom_fields/custom_field_manager.py deleted file mode 100644 index 3583b70..0000000 --- a/scripts/custom_fields/custom_field_manager.py +++ /dev/null @@ -1,240 +0,0 @@ -# (C) 2025 GoodData Corporation -"""Module orchestrating the custom fields logic.""" - -from custom_fields.api import GoodDataAPI # type: ignore[import] -from custom_fields.input_processor import ( # type: ignore[import] - CustomFieldsDataProcessor, -) -from custom_fields.input_validator import ( # type: ignore[import] - CustomFieldsDataValidator, -) -from custom_fields.models.aliases import ( # type: ignore[import] - _DatasetId, - _WorkspaceId, -) -from custom_fields.models.analytical_object import ( # type: ignore[import] - AnalyticalObject, - AnalyticalObjects, -) -from custom_fields.models.custom_data_object import ( # type: ignore[import] - CustomDataset, -) -from gooddata_sdk.sdk import GoodDataSdk - - -class CustomFieldManager: - """Manager for creating custom datasets and fields in GoodData workspaces.""" - - INDENT = " " * 2 - - def __init__(self, host: str, token: str): - self._validator = CustomFieldsDataValidator() - self._processor = CustomFieldsDataProcessor() - self._sdk = GoodDataSdk.create(host_=host, token_=token) - self._api = GoodDataAPI(host=host, token=token) - - def _get_objects_with_invalid_relations( - self, workspace_id: str - ) -> list[AnalyticalObject]: - """Check for invalid references in the provided analytical objects. - - This method checks if the references in the provided analytical objects - are valid. It returns a set of analytical objects that have invalid references. - - Args: - workspace_id (str): The ID of the workspace to check. - - Returns: - list[AnalyticalObject]: Set of analytical objects with invalid references. - """ - - analytical_objects: list[AnalyticalObject] = self._get_analytical_objects( - workspace_id=workspace_id - ) - - objects_with_invalid_relations = [ - obj for obj in analytical_objects if not obj.attributes.are_relations_valid - ] - return objects_with_invalid_relations - - def _get_analytical_objects(self, workspace_id: str) -> list[AnalyticalObject]: - """Get analytical objects in the workspace. - - This method retrieves all analytical objects (metrics, visualizations, dashboards) - in the specified workspace and returns them as a list. - - Args: - workspace_id (str): The ID of the workspace to retrieve objects from. - - Returns: - list[AnalyticalObject]: List of analytical objects in the workspace. - """ - metrics_response = self._api.get_all_metrics(workspace_id) - visualizations_response = self._api.get_all_visualization_objects(workspace_id) - dashboards_response = self._api.get_all_dashboards(workspace_id) - self._api.raise_if_response_not_ok( - metrics_response, - visualizations_response, - dashboards_response, - ) - metrics = AnalyticalObjects(**metrics_response.json()) - visualizations = AnalyticalObjects(**visualizations_response.json()) - dashboards = AnalyticalObjects(**dashboards_response.json()) - - return metrics.data + visualizations.data + dashboards.data - - @staticmethod - def _new_ldm_does_not_invalidate_relations( - current_invalid_relations: list[AnalyticalObject], - new_invalid_relations: list[AnalyticalObject], - ) -> bool: - """Check if the new LDM does not invalidate any new relations. - - This method compares the lists of analytical objects containing invalid - relations. It creates sets of object IDs for each list and compares them. - - If the set of new invalid relations is a subset of the set of current - invalid relations (that is before the changes to the LDM), the new LDM - does not invalidate any new relations and `True` is returned. - - If the set of new invalid relations is not a subset of the current one, - it means that the new LDM invalidates new relations and `False` is returned. - - Args: - current_invalid_relations (list[AnalyticalObject]): The current (before - changes to LDM) invalid relations. - new_invalid_relations (list[AnalyticalObject]): The new (after changes to - LDM) invalid relations. - - Returns: - bool: True if the new LDM does not invalidate any relations, False otherwise. - """ - # Create a set of IDs for each group, then compare those sets - set_current_invalid_relations = {obj.id for obj in current_invalid_relations} - set_new_invalid_relations = {obj.id for obj in new_invalid_relations} - - # If the set of new invalid relations is a subset of the current one, - # the new LDM does not invalidate any new relations. - if set_new_invalid_relations.issubset(set_current_invalid_relations): - return True - else: - return False - - def _process_with_relations_check( - self, validated_data: dict[_WorkspaceId, dict[_DatasetId, CustomDataset]] - ) -> None: - """Check whether relations of analytical objects are valid before and after - updating the LDM in the GoodData workspace. - """ - # Iterate through the workspaces. - for workspace_id, datasets in validated_data.items(): - print(f"⚙️ Processing workspace {workspace_id}...") - # Get current workspace layout - current_layout = self._api.get_workspace_layout(workspace_id) - # Get a set of objects with invalid relations from current workspace state - current_invalid_relations = self._get_objects_with_invalid_relations( - workspace_id=workspace_id - ) - - # Put the LDM with custom datasets into the GoodData workspace. - self._sdk.catalog_workspace_content.put_declarative_ldm( - workspace_id=workspace_id, - ldm=self._processor.datasets_to_ldm(datasets), - ) - - # Get a set of objects with invalid relations from the new workspace state - new_invalid_relations = self._get_objects_with_invalid_relations( - workspace_id=workspace_id - ) - - if self._new_ldm_does_not_invalidate_relations( - current_invalid_relations, new_invalid_relations - ): - self._print_success_message(workspace_id) - continue - - print( - f"❌ Difference in invalid relations found in workspace {workspace_id}." - ) - self._print_diff_invalid_relations( - current_invalid_relations, new_invalid_relations - ) - - print( - f"{self.INDENT}⚠️ Reverting the workspace layout to the original state." - ) - # Put the original workspace layout back to the workspace - revert_response = self._api.put_workspace_layout( - workspace_id=workspace_id, layout=current_layout.json() - ) - - if not revert_response.ok: - print(f"Failed to revert workspace layout in {workspace_id}.") - print(f"Error: {revert_response.status_code} - {revert_response.text}") - - def _print_diff_invalid_relations( - self, - current_invalid_relations: list[AnalyticalObject], - new_invalid_relations: list[AnalyticalObject], - ) -> None: - """Prints out objects with newly invalid relations. - - Objects which previously did not have invalid relations, but do so after - updating the LDM, are printed out. - """ - print(f"{self.INDENT}Objects with newly invalidated relations:") - for obj in new_invalid_relations: - if obj not in current_invalid_relations: - print(f"{self.INDENT}∙ {obj.id} ({obj.type}) {obj.attributes.title}") - - def _process_without_relations_check( - self, validated_data: dict[_WorkspaceId, dict[_DatasetId, CustomDataset]] - ) -> None: - """Update the LDM in the GoodData workspace without checking relations.""" - for workspace_id, datasets in validated_data.items(): - # Put the LDM with custom datasets into the GoodData workspace. - self._sdk.catalog_workspace_content.put_declarative_ldm( - workspace_id=workspace_id, - ldm=self._processor.datasets_to_ldm(datasets), - ) - self._print_success_message(workspace_id) - - def _print_success_message(self, workspace_id: str) -> None: - """Print a success message after updating the workspace LDM.""" - print(f"✅ LDM in {workspace_id} updated successfully.") - - def process( - self, - raw_custom_datasets: list[dict[str, str]], - raw_custom_fields: list[dict[str, str]], - check_relations: bool, - ) -> None: - """Create custom datasets and fields in GoodData workspaces. - - Creates custom datasets and fields to extend the Logical Data Model (LDM) - in GoodData workspaces based on the provided raw data definitions. The raw - data is validated by Pydantic models (CustomDatasetDefinition and CustomFieldDefinition). - The defined datasets and fields are then uploaded to GoodData Cloud. - - Args: - raw_custom_datasets (list[dict[str, str]]): List of raw custom dataset definitions. - raw_custom_fields (list[dict[str, str]]): List of raw custom field definitions. - check_relations (bool): If True, checks for invalid relations in the workspace - after updating the LDM. If the number of invalid relations increases, - the LDM will be reverted to its previous state. If False, the check - is skiped and the LDM is updated directly. Defaults to True. - - Raises: - ValueError: If there are validation errors in the dataset or field definitions. - """ - # Validate raw data and aggregate the custom field and dataset - # definitions per workspace. - validated_data: dict[_WorkspaceId, dict[_DatasetId, CustomDataset]] = ( - self._validator.validate(raw_custom_datasets, raw_custom_fields) - ) - - if check_relations: - # Process the validated data with relations check. - self._process_with_relations_check(validated_data) - else: - self._process_without_relations_check(validated_data) diff --git a/scripts/custom_fields/input_processor.py b/scripts/custom_fields/input_processor.py deleted file mode 100644 index 315b20b..0000000 --- a/scripts/custom_fields/input_processor.py +++ /dev/null @@ -1,281 +0,0 @@ -# (C) 2025 GoodData Corporation -"""Module for processing validated custom datasets and fields data. - -This module contains the `CustomFieldsDataProcessor` class, which is responsible -for converting validated custom datasets and fields into objects defined in the -GoodData Python SDK. -""" - -from custom_fields.models.aliases import _DatasetId # type: ignore[import] -from custom_fields.models.custom_data_object import ( # type: ignore[import] - ColumnDataType, - CustomDataset, - CustomFieldDefinition, - CustomFieldType, -) -from gooddata_sdk.catalog.identifier import ( - CatalogDatasetWorkspaceDataFilterIdentifier, - CatalogGrainIdentifier, - CatalogReferenceIdentifier, -) -from gooddata_sdk.catalog.workspace.declarative_model.workspace.logical_model.data_filter_references import ( - CatalogDeclarativeWorkspaceDataFilterReferences, -) -from gooddata_sdk.catalog.workspace.declarative_model.workspace.logical_model.dataset.dataset import ( - CatalogDataSourceTableIdentifier, - CatalogDeclarativeAttribute, - CatalogDeclarativeDataset, - CatalogDeclarativeDatasetSql, - CatalogDeclarativeFact, - CatalogDeclarativeReference, - CatalogDeclarativeReferenceSource, - CatalogDeclarativeWorkspaceDataFilterColumn, -) -from gooddata_sdk.catalog.workspace.declarative_model.workspace.logical_model.date_dataset.date_dataset import ( - CatalogDeclarativeDateDataset, - CatalogGranularitiesFormatting, -) -from gooddata_sdk.catalog.workspace.declarative_model.workspace.logical_model.ldm import ( - CatalogDeclarativeLdm, - CatalogDeclarativeModel, -) - - -class CustomFieldsDataProcessor: - """Create GoodData LDM from validated custom datasets and fields.""" - - DATE_GRANULARITIES: list[str] = [ - "MINUTE", - "HOUR", - "DAY", - "WEEK", - "MONTH", - "QUARTER", - "YEAR", - "MINUTE_OF_HOUR", - "HOUR_OF_DAY", - "DAY_OF_WEEK", - "DAY_OF_MONTH", - "DAY_OF_YEAR", - "WEEK_OF_YEAR", - "MONTH_OF_YEAR", - "QUARTER_OF_YEAR", - ] - - @staticmethod - def _attribute_from_field( - dataset_name: str, - custom_field: CustomFieldDefinition, - ) -> CatalogDeclarativeAttribute: - """Assign a declarative attribute from a custom field definition.""" - return CatalogDeclarativeAttribute( - id=custom_field.cf_id, - title=custom_field.cf_name, - source_column=custom_field.cf_source_column, - labels=[], - source_column_data_type=custom_field.cf_source_column_data_type.value, - tags=[dataset_name], - ) - - @staticmethod - def _fact_from_field( - dataset_name: str, - custom_field: CustomFieldDefinition, - ) -> CatalogDeclarativeFact: - """Assign a declarative fact from a custom field definition.""" - return CatalogDeclarativeFact( - id=custom_field.cf_id, - title=custom_field.cf_name, - source_column=custom_field.cf_source_column, - source_column_data_type=custom_field.cf_source_column_data_type.value, - tags=[dataset_name], - ) - - def _date_from_field( - self, - dataset_name: str, - custom_field: CustomFieldDefinition, - ) -> CatalogDeclarativeDateDataset: - """Assign a declarative date dataset from a custom field definition.""" - - return CatalogDeclarativeDateDataset( - id=custom_field.cf_id, - title=custom_field.cf_name, - granularities_formatting=CatalogGranularitiesFormatting( - title_base="", - title_pattern="%titleBase - %granularityTitle", - ), - granularities=self.DATE_GRANULARITIES, - tags=[dataset_name], - ) - - @staticmethod - def _date_ref_from_field( - custom_field: CustomFieldDefinition, - ) -> CatalogDeclarativeReference: - """Create a date reference from a custom field definition.""" - return CatalogDeclarativeReference( - identifier=CatalogReferenceIdentifier(id=custom_field.cf_id), - multivalue=False, - sources=[ - CatalogDeclarativeReferenceSource( - column=custom_field.cf_source_column, - target=CatalogGrainIdentifier( - id=custom_field.cf_id, - type=CustomFieldType.DATE.value, - ), - data_type=custom_field.cf_source_column_data_type.value, - ) - ], - ) - - @staticmethod - def _get_sources( - dataset: CustomDataset, - ) -> tuple[ - CatalogDataSourceTableIdentifier | None, CatalogDeclarativeDatasetSql | None - ]: - """Get the data source table and SQL from the dataset definition.""" - # We will have either a table id or a sql statement. Let's store - # whatever data is available to variables and pass it to the - # dataset. Both can be object instances or None, but at least one - # should be valid as per prior validation. - dataset_source_table_id = ( - CatalogDataSourceTableIdentifier( - id=dataset.definition.dataset_source_table, - data_source_id=dataset.definition.dataset_datasource_id, - path=[dataset.definition.dataset_source_table], - ) - if dataset.definition.dataset_source_table - else None - ) - - dataset_sql = ( - CatalogDeclarativeDatasetSql( - statement=dataset.definition.dataset_source_sql, - data_source_id=dataset.definition.dataset_datasource_id, - ) - if dataset.definition.dataset_source_sql - else None - ) - return dataset_source_table_id, dataset_sql - - def datasets_to_ldm( - self, datasets: dict[_DatasetId, CustomDataset] - ) -> CatalogDeclarativeModel: - """Convert validated datasets to GoodData declarative model. - - Args: - datasets (dict[DatasetId, CustomDataset]): Dictionary of validated - datasets. - Returns: - CatalogDeclarativeModel: GoodData declarative model representation - of the datasets. - """ - - declarative_datasets: list[CatalogDeclarativeDataset] = [] - - # Date dimensions are not stored in a dataset, but as a separate datasets - # in `date_instances` object on the LDM - date_instances: list[CatalogDeclarativeDateDataset] = [] - - for dataset in datasets.values(): - date_references: list[CatalogDeclarativeReference] = [] - attributes: list[CatalogDeclarativeAttribute] = [] - facts: list[CatalogDeclarativeFact] = [] - - # Iterate through the custom fields and create the appropriate objects - for custom_field in dataset.custom_fields: - if custom_field.cf_type == CustomFieldType.ATTRIBUTE: - attributes.append( - self._attribute_from_field( - dataset.definition.dataset_name, custom_field - ) - ) - - elif custom_field.cf_type == CustomFieldType.FACT: - facts.append( - self._fact_from_field( - dataset.definition.dataset_name, custom_field - ) - ) - - # Process date dimensions and store them to date_instances. Date - # dimensions are not stored in a dataset, but as a separate dataset. - # However, they need to be referenced in the dataset references to - # create the connection between the dataset and the date dimension - # in the GoodData Logical Data Model. - elif custom_field.cf_type == CustomFieldType.DATE: - # Add the date dimension to the date_instances - date_instances.append( - self._date_from_field( - dataset.definition.dataset_name, custom_field - ) - ) - - # Create a reference so that the date dimension is connected - # to the dataset in the GoodData Logical Data Model. - date_references.append(self._date_ref_from_field(custom_field)) - - else: - raise ValueError( - f"Unsupported custom field type: {custom_field.cf_type}" - ) - - # Get the data source info - dataset_source_table_id, dataset_sql = self._get_sources(dataset) - - # Construct the declarative dataset object and append it to the list. - declarative_datasets.append( - CatalogDeclarativeDataset( - id=dataset.definition.dataset_id, - title=dataset.definition.dataset_name, - grain=[], - references=[ - CatalogDeclarativeReference( - identifier=CatalogReferenceIdentifier( - id=dataset.definition.parent_dataset_reference, - ), - multivalue=True, - sources=[ - CatalogDeclarativeReferenceSource( - column=dataset.definition.dataset_reference_source_column, - data_type=dataset.definition.dataset_reference_source_column_data_type.value, - target=CatalogGrainIdentifier( - id=dataset.definition.parent_dataset_reference_attribute_id, - type=CustomFieldType.ATTRIBUTE.value, - ), - ) - ], - ), - ] - + date_references, - description=None, - attributes=attributes, - facts=facts, - data_source_table_id=dataset_source_table_id, - sql=dataset_sql, - workspace_data_filter_columns=[ - CatalogDeclarativeWorkspaceDataFilterColumn( - name=dataset.definition.wdf_column_name, - data_type=ColumnDataType.STRING.value, - ) - ], - workspace_data_filter_references=[ - CatalogDeclarativeWorkspaceDataFilterReferences( - filter_id=CatalogDatasetWorkspaceDataFilterIdentifier( - id=dataset.definition.wdf_id - ), - filter_column=dataset.definition.wdf_column_name, - filter_column_data_type=ColumnDataType.STRING.value, - ) - ], - tags=[dataset.definition.dataset_name], - ) - ) - - # Create the Logical Data Model from the datasets and the date instances. - ldm = CatalogDeclarativeLdm( - datasets=declarative_datasets, date_instances=date_instances - ) - return CatalogDeclarativeModel(ldm=ldm) diff --git a/scripts/custom_fields/input_validator.py b/scripts/custom_fields/input_validator.py deleted file mode 100644 index c56be0f..0000000 --- a/scripts/custom_fields/input_validator.py +++ /dev/null @@ -1,206 +0,0 @@ -# (C) 2025 GoodData Corporation -"""Module for validating custom fields input data. - -This module provides the `CustomFieldsDataValidator` class, which is responsible -for validating custom fields input data checking for row level and aggregated -constraints. -""" - -from collections import Counter -from typing import Any, Type, TypeVar - -from custom_fields.models.aliases import ( # type: ignore[import] - _DatasetId, - _RawData, - _WorkspaceId, -) -from custom_fields.models.custom_data_object import ( # type: ignore[import] - CustomDataset, - CustomDatasetDefinition, - CustomFieldDefinition, - CustomFieldType, -) -from pydantic import BaseModel - - -class CustomFieldsDataValidator: - ModelT = TypeVar("ModelT", bound=BaseModel) - - def validate( - self, - raw_dataset_definitions: _RawData, - raw_field_definitions: _RawData, - ) -> dict[_WorkspaceId, dict[_DatasetId, CustomDataset]]: - """Validate dataset and field definitions. - - Validates the dataset definitions and field definitions by using Pydantic - models to check row level constraints, then aggregates the definitions - per workspace, while checking for integrity on aggregated level, i.e., - uniqueness of combinations of identifieres on workspace level. - - Args: - raw_dataset_definitions (list[dict[str, str]]): List of raw dataset definitions to validate. - raw_field_definitions (list[dict[str, str]]): List of raw field definitions to validate. - Returns: - dict[WorkspaceId, dict[DatasetId, CustomDataset]]: - Dictionary of validated dataset definitions per workspace, - where each dataset contains its custom fields: - ```python - { - "workspace_id_1": { - "dataset_id_1": CustomDataset(...), - "dataset_id_2": CustomDataset(...), - }, - ... - } - ``` - """ - - # First, validate the dataset definitions and aggregate them per workspace. - validated_data = self._validate_dataset_definitions(raw_dataset_definitions) - - # Then validate the field definitions and connect them to the datasets - validated_data = self._validate_field_definitions( - validated_data, raw_field_definitions - ) - - return validated_data - - def _validate_with_pydantic( - self, raw_data: _RawData, model: Type[ModelT] - ) -> list[ModelT]: - """Validate data using provided Pydantic model. - - Validates each dict to check row level constraints. - - Args: - raw_data (list[dict[str, str]]): List of dictionaries containing raw data. - model (Type[ModelT]): Pydantic model to validate against. - Returns: - list[ModelT]: List of validated model instances. - """ - return [model(**item) for item in raw_data] - - def _validate_dataset_definitions( - self, - raw_dataset_definitions: _RawData, - ) -> dict[_WorkspaceId, dict[_DatasetId, CustomDataset]]: - dataset_definitions: list[CustomDatasetDefinition] = ( - self._validate_with_pydantic( - raw_dataset_definitions, CustomDatasetDefinition - ) - ) - self._check_dataset_combinations(dataset_definitions) - - validated_definitions: dict[_WorkspaceId, dict[_DatasetId, CustomDataset]] = {} - for definition in dataset_definitions: - validated_definitions.setdefault(definition.workspace_id, {})[ - definition.dataset_id - ] = CustomDataset(definition=definition, custom_fields=[]) - - return validated_definitions - - def _check_dataset_combinations( - self, dataset_definitions: list[CustomDatasetDefinition] - ) -> None: - """Check integrity of provided dataset definitions. - - Validation criteria: - - workspace_id + dataset_id must be unique across all dataset definitions. - - Args: - dataset_definitions (list[CustomDatasetDefinition]): List of dataset definitions to check. - Raises: - ValueError: If there are duplicate dataset definitions based on workspace_id and dataset_id. - """ - workspace_dataset_combinations = [ - (definition.workspace_id, definition.dataset_id) - for definition in dataset_definitions - ] - if len(workspace_dataset_combinations) != len( - set(workspace_dataset_combinations) - ): - duplicates = self._get_duplicates(workspace_dataset_combinations) - raise ValueError( - "Duplicate dataset definitions found in the raw dataset " - + f"definitions (workspace_id, dataset_id): {duplicates}" - ) - - @staticmethod - def _get_duplicates(list_to_check: list[Any]) -> list[Any]: - """Get duplicates from a list. - - Args: - list_to_check (list[Any]): List of items to check for duplicates. - Returns: - list[Any]: List of duplicate items. - """ - counts = Counter(list_to_check) - return [item for item, count in counts.items() if count > 1] - - def _check_field_combinations( - self, field_definitions: list[CustomFieldDefinition] - ) -> None: - """Check integrity of provided field definitions. - - Validation criteria (per workspace): - - unique workspace_id + cf_id combinations (only for attribute and fact cf_type) - - there is no row with the same dataset_id and cf_id (only for date cf_type) - - Args: - field_definitions (list[CustomFieldDefinition]): List of field definitions to check. - Raises: - ValueError: If there are duplicate field definitions based on workspace_id and cf_id. - """ - workspace_field_combinations: set[tuple[str, str]] = set() - dataset_field_combinations: set[tuple[str, str]] = set() - - for field in field_definitions: - if field.cf_type in [CustomFieldType.ATTRIBUTE, CustomFieldType.FACT]: - combination = (field.workspace_id, field.cf_id) - if self._set_contains(workspace_field_combinations, combination): - raise ValueError( - f"Duplicate custom field found for workspace {field.workspace_id} " - + f"with field ID {field.cf_id}" - ) - workspace_field_combinations.add(combination) - elif field.cf_type == CustomFieldType.DATE: - combination = (field.dataset_id, field.cf_id) - if self._set_contains(dataset_field_combinations, combination): - raise ValueError( - f"Duplicate custom field found for dataset {field.dataset_id} " - + f"with field ID {field.cf_id}" - ) - dataset_field_combinations.add(combination) - - @staticmethod - def _set_contains(set_to_check: set[Any], item: Any) -> bool: - """Helper function to check if an item is in a set.""" - return item in set_to_check - - def _validate_field_definitions( - self, - validated_definitions: dict[_WorkspaceId, dict[_DatasetId, CustomDataset]], - raw_field_definitions: _RawData, - ) -> dict[_WorkspaceId, dict[_DatasetId, CustomDataset]]: - """Validates custom field definitions amd connects them to the datasets. - - Args: - validated_definitions (dict[WorkspaceId, dict[DatasetId, CustomDataset]]): - Dictionary of validated dataset definitions per workspace. - raw_field_definitions (list[dict[str, str]]): List of raw field definitions to validate. - Returns: - dict[WorkspaceId, dict[DatasetId, CustomDataset]]: - Updated dictionary of validated dataset definitions with custom fields added. - """ - field_definitions: list[CustomFieldDefinition] = self._validate_with_pydantic( - raw_field_definitions, CustomFieldDefinition - ) - self._check_field_combinations(field_definitions) - - for field_definition in field_definitions: - validated_definitions[field_definition.workspace_id][ - field_definition.dataset_id - ].custom_fields.append(field_definition) - - return validated_definitions diff --git a/scripts/custom_fields/models/__init__.py b/scripts/custom_fields/models/__init__.py deleted file mode 100644 index 37d863d..0000000 --- a/scripts/custom_fields/models/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# (C) 2025 GoodData Corporation diff --git a/scripts/custom_fields/models/aliases.py b/scripts/custom_fields/models/aliases.py deleted file mode 100644 index 9405aee..0000000 --- a/scripts/custom_fields/models/aliases.py +++ /dev/null @@ -1,10 +0,0 @@ -# (C) 2025 GoodData Corporation -"""This module defines type aliases intended to improve readability.""" - -from typing import TypeAlias - -_WorkspaceId: TypeAlias = str -_DatasetId: TypeAlias = str -_RawData: TypeAlias = list[dict[str, str]] - -__all__ = ["_WorkspaceId", "_DatasetId", "_RawData"] diff --git a/scripts/custom_fields/models/analytical_object.py b/scripts/custom_fields/models/analytical_object.py deleted file mode 100644 index fede882..0000000 --- a/scripts/custom_fields/models/analytical_object.py +++ /dev/null @@ -1,33 +0,0 @@ -# (C) 2025 GoodData Corporation -"""This module defines the AnalyticalObjects Pydantic model. - -The model is used to represent features of analytical objects important for -checking the validity of references. -""" - -from pydantic import BaseModel, Field - - -class Attributes(BaseModel): - title: str - are_relations_valid: bool = Field(alias="areRelationsValid") - - -class AnalyticalObject(BaseModel): - id: str - type: str - attributes: Attributes - - -class AnalyticalObjects(BaseModel): - """Simplified model representing response obtained from GoodData API when querying - analytical objects. - - This model is used to represent analytical objects such as metrics, visualizations, - and dashboard in a simplified manner, with the purpose of checkinf the validity - of references of these objects. - - This is not a complete schema of the analytical objects! - """ - - data: list[AnalyticalObject] diff --git a/scripts/custom_fields/models/custom_data_object.py b/scripts/custom_fields/models/custom_data_object.py deleted file mode 100644 index 9d2d729..0000000 --- a/scripts/custom_fields/models/custom_data_object.py +++ /dev/null @@ -1,88 +0,0 @@ -# (C) 2025 GoodData Corporation -"""This module defines enums and models used to represent the input data. - -Models defined here are used to validate and structure the input data before -further processing. -""" - -from enum import Enum - -from pydantic import BaseModel, model_validator - - -class CustomFieldType(Enum): - """GoodData field types.""" - - ATTRIBUTE = "attribute" - FACT = "fact" - DATE = "date" - - -class ColumnDataType(Enum): - """Supported data types""" - - INT = "INT" - STRING = "STRING" - DATE = "DATE" - NUMERIC = "NUMERIC" - TIMESTAMP = "TIMESTAMP" - TIMESTAMP_TZ = "TIMESTAMP_TZ" - BOOLEAN = "BOOLEAN" - - -class CustomFieldDefinition(BaseModel): - """Input model for custom field definition.""" - - workspace_id: str - dataset_id: str - cf_id: str - cf_name: str - cf_type: CustomFieldType - cf_source_column: str - cf_source_column_data_type: ColumnDataType - - @model_validator(mode="after") - def check_ids_not_equal(self) -> "CustomFieldDefinition": - """Check that custom field ID is not the same as dataset ID.""" - if self.cf_id == self.dataset_id: - raise ValueError( - f"Custom field ID {self.cf_id} cannot be the same as dataset ID {self.dataset_id}" - ) - return self - - -class CustomDatasetDefinition(BaseModel): - """Input model for custom dataset definition.""" - - workspace_id: str - dataset_id: str - dataset_name: str - dataset_datasource_id: str - dataset_source_table: str | None - dataset_source_sql: str | None - parent_dataset_reference: str - parent_dataset_reference_attribute_id: str - dataset_reference_source_column: str - dataset_reference_source_column_data_type: ColumnDataType - wdf_id: str - wdf_column_name: str - - @model_validator(mode="after") - def check_source(self) -> "CustomDatasetDefinition": - """At least one of dataset_source_table or dataset_source_sql is provided.""" - if not (self.dataset_source_table or self.dataset_source_sql): - raise ValueError( - "One of dataset_source_table and dataset_source_sql must be provided" - ) - if self.dataset_source_table and self.dataset_source_sql: - raise ValueError( - "Only one of dataset_source_table and dataset_source_sql can be provided" - ) - return self - - -class CustomDataset(BaseModel): - """Custom dataset with its definition and custom fields.""" - - definition: CustomDatasetDefinition - custom_fields: list[CustomFieldDefinition] diff --git a/scripts/permission_mgmt.py b/scripts/permission_mgmt.py index cd93676..f2261b5 100644 --- a/scripts/permission_mgmt.py +++ b/scripts/permission_mgmt.py @@ -11,7 +11,7 @@ from gooddata_sdk.utils import PROFILES_FILE_PATH from utils.logger import get_logger, setup_logging # type: ignore[import] from utils.utils import ( # type: ignore[import] - create_provisioner, + create_client, read_csv_file_to_dict, ) @@ -114,7 +114,7 @@ def permission_mgmt(): permissions = read_permissions_from_csv(args) - permission_manager = create_provisioner( + permission_manager = create_client( PermissionProvisioner, args.profile_config, args.profile ) diff --git a/scripts/user_data_filter_mgmt.py b/scripts/user_data_filter_mgmt.py index 1ca2a71..f06c80c 100644 --- a/scripts/user_data_filter_mgmt.py +++ b/scripts/user_data_filter_mgmt.py @@ -8,7 +8,7 @@ from gooddata_sdk.utils import PROFILES_FILE_PATH from utils.logger import get_logger, setup_logging # type: ignore[import] from utils.utils import ( # type: ignore[import] - create_provisioner, + create_client, read_csv_file_to_dict, ) @@ -109,7 +109,7 @@ def udf_mgmt(): validated_user_data_filters = validate_user_data_filter_data(raw_user_data_filters) # Create provisioner and subscribe to logger - provisioner: UserDataFilterProvisioner = create_provisioner( + provisioner: UserDataFilterProvisioner = create_client( UserDataFilterProvisioner, args.profile_config, args.profile ) diff --git a/scripts/user_group_mgmt.py b/scripts/user_group_mgmt.py index 7373941..f26810d 100644 --- a/scripts/user_group_mgmt.py +++ b/scripts/user_group_mgmt.py @@ -22,7 +22,7 @@ from gooddata_sdk.utils import PROFILES_FILE_PATH from utils.logger import get_logger, setup_logging # type: ignore[import] from utils.utils import ( # type: ignore[import] - create_provisioner, + create_client, read_csv_file_to_dict, ) @@ -142,7 +142,7 @@ def user_group_mgmt(): try: validate_args(args) - provisioner = create_provisioner( + provisioner = create_client( UserGroupProvisioner, args.profile_config, args.profile ) diff --git a/scripts/user_mgmt.py b/scripts/user_mgmt.py index 1378eb8..06da07d 100644 --- a/scripts/user_mgmt.py +++ b/scripts/user_mgmt.py @@ -8,7 +8,7 @@ from gooddata_pipelines import UserIncrementalLoad, UserProvisioner from gooddata_sdk.utils import PROFILES_FILE_PATH from utils.logger import get_logger, setup_logging # type: ignore[import] -from utils.utils import create_provisioner # type: ignore[import] +from utils.utils import create_client # type: ignore[import] UG_REGEX = r"^(?!\.)[.A-Za-z0-9_-]{1,255}$" @@ -135,7 +135,7 @@ def user_mgmt() -> None: args.user_csv, args.delimiter, args.quotechar, args.ug_delimiter ) - provisioner = create_provisioner(UserProvisioner, args.profile_config, args.profile) + provisioner = create_client(UserProvisioner, args.profile_config, args.profile) provisioner.logger.subscribe(logger) diff --git a/scripts/utils/utils.py b/scripts/utils/utils.py index a953289..9614dee 100644 --- a/scripts/utils/utils.py +++ b/scripts/utils/utils.py @@ -5,13 +5,25 @@ import logging import os from pathlib import Path -from typing import Type - -from gooddata_pipelines.provisioning.provisioning import Provisioning +from typing import Protocol, Type logger = logging.getLogger(__name__) +class PipelinesClient(Protocol): + """Protocol for GoodData Pipelines clients (Provisioners, Managers...).""" + + @classmethod + def create(cls, host: str, token: str) -> "PipelinesClient": + pass + + @classmethod + def create_from_profile( + cls, profile: str, profiles_path: Path + ) -> "PipelinesClient": + pass + + def read_csv_file_to_dict( file_path: str, delimiter: str = ",", quotechar: str = '"' ) -> list[dict[str, str]]: @@ -27,20 +39,20 @@ def read_csv_file_to_dict( return list(csv.DictReader(file, delimiter=delimiter, quotechar=quotechar)) -def create_provisioner( - ProvisionerType: Type[Provisioning], profile_config: Path, profile: str -) -> Provisioning: - """Creates GoodData SDK client.""" +def create_client( + client_type: Type[PipelinesClient], profile_config: Path, profile: str +) -> PipelinesClient: + """Creates GoodData Pipelines client of given type.""" gdc_auth_token = os.environ.get("GDC_AUTH_TOKEN") gdc_hostname = os.environ.get("GDC_HOSTNAME") if gdc_hostname and gdc_auth_token: logger.info("Using GDC_HOSTNAME and GDC_AUTH_TOKEN envvars.") - return ProvisionerType.create(host=gdc_hostname, token=gdc_auth_token) + return client_type.create(host=gdc_hostname, token=gdc_auth_token) if os.path.exists(profile_config): logger.info(f"Using GoodData profile {profile} sourced from {profile_config}.") - return ProvisionerType.create_from_profile( + return client_type.create_from_profile( profile=profile, profiles_path=profile_config ) diff --git a/scripts/workspace_mgmt.py b/scripts/workspace_mgmt.py index 2817b49..b38ff44 100644 --- a/scripts/workspace_mgmt.py +++ b/scripts/workspace_mgmt.py @@ -8,7 +8,7 @@ from gooddata_sdk.utils import PROFILES_FILE_PATH from utils.logger import get_logger, setup_logging # type: ignore[import] from utils.utils import ( # type: ignore[import] - create_provisioner, + create_client, read_csv_file_to_dict, ) @@ -133,9 +133,7 @@ def workspace_mgmt(): validated_workspaces = validate_workspace_data(raw_workspaces, args.inner_delimiter) # Create provisioner and subscribe to logger - provisioner = create_provisioner( - WorkspaceProvisioner, args.profile_config, args.profile - ) + provisioner = create_client(WorkspaceProvisioner, args.profile_config, args.profile) provisioner.logger.subscribe(logger) diff --git a/tests/data/custom_fields/response_get_all_dashboards.json b/tests/data/custom_fields/response_get_all_dashboards.json deleted file mode 100644 index 0b88398..0000000 --- a/tests/data/custom_fields/response_get_all_dashboards.json +++ /dev/null @@ -1,72 +0,0 @@ -{ - "data": [ - { - "id": "dashboard_id_1", - "type": "analyticalDashboard", - "attributes": { - "title": "Custom Dashboard", - "areRelationsValid": true, - "content": { - "layout": { - "type": "IDashboardLayout", - "sections": [ - { - "type": "IDashboardLayoutSection", - "items": [ - { - "type": "IDashboardLayoutItem", - "size": { - "xl": { - "gridWidth": 12, - "gridHeight": 12 - } - }, - "widget": { - "insight": { - "identifier": { - "id": "visualization_id_1", - "type": "visualizationObject" - } - } - } - }, - { - "type": "IDashboardLayoutItem", - "size": { - "xl": { - "gridWidth": 12, - "gridHeight": 12 - } - }, - "widget": { - "insight": { - "identifier": { - "id": "visualization_id_2", - "type": "visualizationObject" - } - } - } - } - ] - } - ] - } - }, - "createdAt": "2025-06-17 13:13" - }, - "links": { - "self": "https://link-to-self.com" - }, - "meta": { - "origin": { - "originType": "NATIVE", - "originId": "workspace_id_1" - } - } - } - ], - "links": { - "self": "https://link-to-self.com", - "next": "https://link-to-next.com" - } -} diff --git a/tests/data/custom_fields/response_get_all_metrics.json b/tests/data/custom_fields/response_get_all_metrics.json deleted file mode 100644 index 74407eb..0000000 --- a/tests/data/custom_fields/response_get_all_metrics.json +++ /dev/null @@ -1,78 +0,0 @@ -{ - "data": [ - { - "id": "metric_id_1", - "type": "metric", - "attributes": { - "title": "metric title 1", - "description": "", - "areRelationsValid": true, - "createdAt": "2025-06-12 13:26", - "content": { - "format": "#,##0.00", - "maql": "select AVG({fact/some_fact})" - } - }, - "links": { - "self": "https://link-to-self.com" - }, - "meta": { - "origin": { - "originType": "NATIVE", - "originId": "workspace_id_1" - } - } - }, - { - "id": "metric_id_2", - "type": "metric", - "attributes": { - "title": "metric title 2", - "description": "", - "areRelationsValid": true, - "createdAt": "2025-06-13 08:12", - "content": { - "format": "#,##0.00", - "maql": "select AVG({fact/some_other_fact})" - } - }, - "links": { - "self": "https://link-to-self.com" - }, - "meta": { - "origin": { - "originType": "NATIVE", - "originId": "workspace_id_1" - } - } - }, - { - "id": "metric_id_3", - "type": "metric", - "attributes": { - "title": "metric title 3", - "description": "", - "areRelationsValid": false, - "createdAt": "2025-06-13 08:12", - "modifiedAt": "2025-06-13 08:16", - "content": { - "format": "#,##0.00", - "maql": "SELECT SUM( {fact/some_fact}* {fact/some_other_fact} )" - } - }, - "links": { - "self": "https://link-to-self.com" - }, - "meta": { - "origin": { - "originType": "NATIVE", - "originId": "custom_field_child" - } - } - } - ], - "links": { - "self": "https://link-to-self.com", - "next": "https://link-to-self.com" - } -} diff --git a/tests/data/custom_fields/response_get_all_visualizations.json b/tests/data/custom_fields/response_get_all_visualizations.json deleted file mode 100644 index e64c66a..0000000 --- a/tests/data/custom_fields/response_get_all_visualizations.json +++ /dev/null @@ -1,143 +0,0 @@ -{ - "data": [ - { - "id": "visualization_id_1", - "type": "visualizationObject", - "attributes": { - "title": "chart title 1", - "description": "", - "areRelationsValid": true, - "content": { - "buckets": [ - { - "items": [ - { - "measure": { - "localIdentifier": "f3be64be0d3a49019088462bfe87d31f", - "definition": { - "measureDefinition": { - "item": { - "identifier": { - "id": "metric_id_1", - "type": "metric" - } - }, - "filters": [] - } - }, - "title": "item title 1" - } - } - ], - "localIdentifier": "measures" - }, - { - "items": [ - { - "attribute": { - "localIdentifier": "a5a36aa84014410aaaa2f16ade7d3808", - "displayForm": { - "identifier": { "id": "attribute id", "type": "label" } - } - } - } - ], - "localIdentifier": "attribute" - } - ], - "filters": [], - "sorts": [ - { - "attributeSortItem": { - "attributeIdentifier": "a5a36aa84014410aaaa2f16ade7d3808", - "direction": "asc" - } - } - ], - "properties": {}, - "visualizationUrl": "local:table", - "version": "2" - }, - "createdAt": "2025-06-12 13:28" - }, - "links": { - "self": "http://link-to-self.com" - }, - "meta": { - "origin": { "originType": "NATIVE", "originId": "workspace_id_1" } - } - }, - { - "id": "visualization_id_2", - "type": "visualizationObject", - "attributes": { - "title": "chart title 2", - "description": "", - "areRelationsValid": true, - "content": { - "buckets": [ - { - "items": [ - { - "measure": { - "localIdentifier": "91afbe18dca94984bc0ebb42b6b9f814", - "definition": { - "measureDefinition": { - "item": { - "identifier": { - "id": "metric_id_2", - "type": "metric" - } - }, - "filters": [] - } - }, - "title": "item title 2" - } - } - ], - "localIdentifier": "measures" - }, - { - "items": [ - { - "attribute": { - "localIdentifier": "5c5a83f5b5194fed9d4de1170acf3fef", - "displayForm": { - "identifier": { "id": "attribute id_1", "type": "label" } - } - } - }, - { - "attribute": { - "localIdentifier": "f80144be70944ea0be6b12d130f8ef0e", - "displayForm": { - "identifier": { "id": "attribute id_2", "type": "label" } - } - } - } - ], - "localIdentifier": "view" - } - ], - "filters": [], - "sorts": [], - "properties": {}, - "visualizationUrl": "local:column", - "version": "2" - }, - "createdAt": "2025-06-16 21:12" - }, - "links": { - "self": "https://link-to-self.com" - }, - "meta": { - "origin": { "originType": "NATIVE", "originId": "workspace_id_1" } - } - } - ], - "links": { - "self": "https://link-to-self.com", - "next": "https://link-to-next.com" - } -} diff --git a/tests/test_custom_fields/__init__.py b/tests/test_custom_fields/__init__.py deleted file mode 100644 index 37d863d..0000000 --- a/tests/test_custom_fields/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# (C) 2025 GoodData Corporation diff --git a/tests/test_custom_fields/test_custom_fields.py b/tests/test_custom_fields/test_custom_fields.py deleted file mode 100644 index 23c3963..0000000 --- a/tests/test_custom_fields/test_custom_fields.py +++ /dev/null @@ -1,150 +0,0 @@ -import os -import sys - -sys.path.insert( - 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../scripts")) -) -import pytest -from custom_fields.custom_field_manager import ( # type: ignore[import] - CustomFieldManager, -) -from pytest_mock import MockerFixture - - -@pytest.fixture -def manager(mocker: MockerFixture): - # Patch dependencies in the constructor - mocker.patch("scripts.custom_fields.custom_field_manager.CustomFieldsDataValidator") - mocker.patch("scripts.custom_fields.custom_field_manager.CustomFieldsDataProcessor") - mocker.patch("scripts.custom_fields.custom_field_manager.GoodDataSdk") - mocker.patch("scripts.custom_fields.custom_field_manager.GoodDataAPI") - return CustomFieldManager(host="host", token="token") - - -@pytest.fixture -def validated_data(mocker: MockerFixture): - # Minimal valid structure for validated_data - return {"workspace_1": {"dataset_1": mocker.MagicMock()}} - - -def make_analytical_object(mocker: MockerFixture, id, title="Title", type="type"): - obj = mocker.MagicMock() - obj.id = id - obj.type = type - obj.attributes.title = title - return obj - - -def test_relations_check_success(manager, validated_data, mocker: MockerFixture): - """Relation check passes, workspace layout not reverted.""" - # Setup mocks - mocker.patch.object( - manager._api, - "get_workspace_layout", - return_value=mocker.MagicMock( - json=mocker.MagicMock(return_value="layout_json") - ), - ) - mocker.patch.object( - manager, - "_get_analytical_objects", - side_effect=[ - [make_analytical_object(mocker, "a", "A")], # current - [make_analytical_object(mocker, "a", "A")], # new - ], - ) - mocker.patch.object( - manager, - "_get_objects_with_invalid_relations", - side_effect=[ - set(), # current_invalid_relations - set(), # new_invalid_relations - ], - ) - mocker.patch.object(manager._processor, "datasets_to_ldm", return_value="ldm") - mocker.patch.object(manager._sdk.catalog_workspace_content, "put_declarative_ldm") - mocker.patch.object( - manager, "_new_ldm_does_not_invalidate_relations", return_value=True - ) - mocker.patch.object(manager._api, "put_workspace_layout") - - # Should print "Workspace workspace_1 LDM updated." and not revert - manager._process_with_relations_check(validated_data) - manager._sdk.catalog_workspace_content.put_declarative_ldm.assert_called_once() - manager._api.put_workspace_layout.assert_not_called() - - -def test_relations_check_failure_and_revert( - manager, validated_data, capsys, mocker: MockerFixture -): - """Relation check fails, workspace layout is reverted.""" - # Setup mocks - mocker.patch.object( - manager._api, - "get_workspace_layout", - return_value=mocker.MagicMock( - json=mocker.MagicMock(return_value="layout_json") - ), - ) - obj1 = make_analytical_object(mocker, "a", "A") - obj2 = make_analytical_object(mocker, "b", "B") - mocker.patch.object( - manager, - "_get_objects_with_invalid_relations", - side_effect=[ - {obj1}, # current_invalid_relations - {obj1, obj2}, # new_invalid_relations (one more invalid) - ], - ) - mocker.patch.object(manager._processor, "datasets_to_ldm", return_value="ldm") - mocker.patch.object(manager._sdk.catalog_workspace_content, "put_declarative_ldm") - mocker.patch.object( - manager, "_new_ldm_does_not_invalidate_relations", return_value=False - ) - mocker.patch.object(manager._api, "put_workspace_layout") - - manager._process_with_relations_check(validated_data) - - # Should revert and print info about invalid relations - manager._api.put_workspace_layout.assert_called_once_with( - workspace_id="workspace_1", layout="layout_json" - ) - out = capsys.readouterr().out - assert "Difference in invalid relations found in workspace workspace_1." in out - assert "b (type) B" in out - assert "Reverting the workspace layout to the original state." in out - - -def test_relations_check_fewer_invalid_relations( - manager, validated_data, mocker: MockerFixture -): - """Fewer invalid relations after LDM update, no revert needed.""" - # Setup mocks - obj1 = make_analytical_object(mocker, "a", "A") - mocker.patch.object( - manager._api, - "get_workspace_layout", - return_value=mocker.MagicMock( - json=mocker.MagicMock(return_value="layout_json") - ), - ) - mocker.patch.object( - manager, - "_get_objects_with_invalid_relations", - side_effect=[ - { - obj1, - make_analytical_object(mocker, "b", "B"), - }, # current_invalid_relations - {obj1}, # new_invalid_relations (fewer) - ], - ) - mocker.patch.object(manager._processor, "datasets_to_ldm", return_value="ldm") - mocker.patch.object(manager._sdk.catalog_workspace_content, "put_declarative_ldm") - mocker.patch.object( - manager, "_new_ldm_does_not_invalidate_relations", return_value=True - ) - mocker.patch.object(manager._api, "put_workspace_layout") - - manager._process_with_relations_check(validated_data) - manager._api.put_workspace_layout.assert_not_called() diff --git a/tests/test_custom_fields/test_input_processor.py b/tests/test_custom_fields/test_input_processor.py deleted file mode 100644 index c92ef4b..0000000 --- a/tests/test_custom_fields/test_input_processor.py +++ /dev/null @@ -1,175 +0,0 @@ -import os -import sys - -sys.path.insert( - 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../scripts")) -) - -import pytest -from custom_fields.input_processor import ( # type: ignore[import] - CustomFieldsDataProcessor, -) -from custom_fields.models.custom_data_object import ( # type: ignore[import] - ColumnDataType, - CustomDataset, - CustomDatasetDefinition, - CustomFieldDefinition, - CustomFieldType, -) - - -@pytest.fixture -def mock_custom_field_attribute(): - return CustomFieldDefinition( - workspace_id="workspace1", - dataset_id="ds1", - cf_id="attr1", - cf_name="Attribute 1", - cf_type=CustomFieldType.ATTRIBUTE, - cf_source_column="col_attr1", - cf_source_column_data_type=ColumnDataType.STRING, - ) - - -@pytest.fixture -def mock_custom_field_fact(): - return CustomFieldDefinition( - workspace_id="workspace1", - dataset_id="ds1", - cf_id="fact1", - cf_name="Fact 1", - cf_type=CustomFieldType.FACT, - cf_source_column="col_fact1", - cf_source_column_data_type=ColumnDataType.INT, - ) - - -@pytest.fixture -def mock_custom_field_date(): - return CustomFieldDefinition( - workspace_id="workspace1", - dataset_id="ds1", - cf_id="date1", - cf_name="Date 1", - cf_type=CustomFieldType.DATE, - cf_source_column="col_date1", - cf_source_column_data_type=ColumnDataType.DATE, - ) - - -@pytest.fixture -def mock_dataset_definition(): - return CustomDatasetDefinition( - workspace_id="workspace1", - dataset_id="ds1", - dataset_name="Dataset 1", - dataset_source_table="table1", - dataset_datasource_id="ds_source", - dataset_source_sql=None, - parent_dataset_reference="parent_ds", - parent_dataset_reference_attribute_id="parent_attr", - dataset_reference_source_column="ref_col", - dataset_reference_source_column_data_type=ColumnDataType.STRING, - wdf_id="wdf1", - wdf_column_name="col1", - ) - - -@pytest.fixture -def mock_custom_dataset( - mock_dataset_definition, - mock_custom_field_attribute, - mock_custom_field_fact, - mock_custom_field_date, -): - return CustomDataset( - definition=mock_dataset_definition, - custom_fields=[ - mock_custom_field_attribute, - mock_custom_field_fact, - mock_custom_field_date, - ], - ) - - -def test_attribute_from_field(mock_custom_field_attribute): - attr = CustomFieldsDataProcessor._attribute_from_field( - "dataset_name", mock_custom_field_attribute - ) - assert attr.id == "attr1" - assert attr.title == "Attribute 1" - assert attr.source_column == "col_attr1" - assert attr.source_column_data_type == ColumnDataType.STRING.value - assert attr.tags == ["dataset_name"] - - -def test_fact_from_field(mock_custom_field_fact): - fact = CustomFieldsDataProcessor._fact_from_field( - "dataset_name", mock_custom_field_fact - ) - assert fact.id == "fact1" - assert fact.title == "Fact 1" - assert fact.source_column == "col_fact1" - assert fact.source_column_data_type == ColumnDataType.INT.value - assert fact.tags == ["dataset_name"] - - -def test_date_from_field(mock_custom_field_date): - processor = CustomFieldsDataProcessor() - date_ds = processor._date_from_field("dataset_name", mock_custom_field_date) - assert date_ds.id == "date1" - assert date_ds.title == "Date 1" - assert set(date_ds.granularities) == set(processor.DATE_GRANULARITIES) - assert date_ds.tags == ["dataset_name"] - - -def test_date_ref_from_field(mock_custom_field_date): - ref = CustomFieldsDataProcessor._date_ref_from_field(mock_custom_field_date) - assert ref.identifier.id == "date1" - assert ref.sources - assert ref.sources[0].column == "col_date1" - assert ref.sources[0].data_type == ColumnDataType.DATE.value - - -def test_get_sources_table_only(mock_dataset_definition): - mock_dataset_definition.dataset_source_sql = None - dataset = CustomDataset(definition=mock_dataset_definition, custom_fields=[]) - table_id, sql = CustomFieldsDataProcessor._get_sources(dataset) - assert table_id is not None - assert table_id.id == "table1" - assert sql is None - - -def test_get_sources_sql_only(mock_dataset_definition): - mock_dataset_definition.dataset_source_table = None - mock_dataset_definition.dataset_source_sql = "SELECT * FROM foo" - dataset = CustomDataset(definition=mock_dataset_definition, custom_fields=[]) - table_id, sql = CustomFieldsDataProcessor._get_sources(dataset) - assert table_id is None - assert sql is not None - assert sql.statement == "SELECT * FROM foo" - - -def test_datasets_to_ldm(mock_custom_dataset): - print(mock_custom_dataset) - processor = CustomFieldsDataProcessor() - datasets = {"ds1": mock_custom_dataset} - model = processor.datasets_to_ldm(datasets) - # Check that the model contains the expected dataset and date instance - ldm = model.ldm - assert ldm - assert len(ldm.datasets) == 1 - ds = ldm.datasets[0] - assert ds.id == "ds1" - assert ds.title == "Dataset 1" - assert ds.attributes - assert ds.facts - assert len(ds.attributes) == 1 - assert len(ds.facts) == 1 - assert len(ds.references) == 2 # 1 parent + 1 date - assert ds.workspace_data_filter_columns - assert ds.workspace_data_filter_references - assert ds.workspace_data_filter_columns[0].name == "col1" - assert ds.workspace_data_filter_references[0].filter_id.id == "wdf1" - assert len(ldm.date_instances) == 1 - assert ldm.date_instances[0].id == "date1" diff --git a/tests/test_custom_fields/test_input_validator.py b/tests/test_custom_fields/test_input_validator.py deleted file mode 100644 index 57ce14d..0000000 --- a/tests/test_custom_fields/test_input_validator.py +++ /dev/null @@ -1,203 +0,0 @@ -import os -import sys - -sys.path.insert( - 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../scripts")) -) - -import pytest -from custom_fields.input_validator import ( # type: ignore[import] - CustomFieldsDataValidator, -) -from custom_fields.models.custom_data_object import ( # type: ignore[import] - CustomDataset, -) -from pydantic import ValidationError - - -@pytest.fixture -def valid_dataset_definitions(): - """Fixture to provide valid dataset definitions for testing.""" - return [ - { - "workspace_id": "ws1", - "dataset_id": "ds1", - "dataset_name": "Dataset 1", - "dataset_datasource_id": "ds_source_1", - "dataset_source_table": "table1", - "dataset_source_sql": None, - "parent_dataset_reference": "parent1", - "parent_dataset_reference_attribute_id": "parent1.id", - "dataset_reference_source_column": "id", - "dataset_reference_source_column_data_type": "STRING", - "wdf_id": "wdf1", - "wdf_column_name": "id", - }, - { - "workspace_id": "ws2", - "dataset_id": "ds1", - "dataset_name": "Dataset 2", - "dataset_datasource_id": "ds_source_2", - "dataset_source_table": "table2", - "dataset_source_sql": None, - "parent_dataset_reference": "parent2", - "parent_dataset_reference_attribute_id": "parent2.id", - "dataset_reference_source_column": "id", - "dataset_reference_source_column_data_type": "INT", - "wdf_id": "wdf2", - "wdf_column_name": "id", - }, - ] - - -@pytest.fixture -def valid_field_definitions(): - """Fixture to provide valid field definitions for testing.""" - return [ - { - "workspace_id": "ws1", - "dataset_id": "ds1", - "cf_id": "cf1", - "cf_name": "Field 1", - "cf_type": "attribute", - "cf_source_column": "col1", - "cf_source_column_data_type": "STRING", - }, - { - "workspace_id": "ws1", - "dataset_id": "ds1", - "cf_id": "cf2", - "cf_name": "Field 2", - "cf_type": "attribute", - "cf_source_column": "col2", - "cf_source_column_data_type": "STRING", - }, - { - "workspace_id": "ws2", - "dataset_id": "ds1", - "cf_id": "cf3", - "cf_name": "Field 3", - "cf_type": "attribute", - "cf_source_column": "col3", - "cf_source_column_data_type": "STRING", - }, - ] - - -def test_validate_success(valid_dataset_definitions, valid_field_definitions): - """Provide valid input data and expect successful validation.""" - validator = CustomFieldsDataValidator() - result = validator.validate(valid_dataset_definitions, valid_field_definitions) - assert isinstance(result, dict) - assert "ws1" in result - assert "ds1" in result["ws1"] - assert isinstance(result["ws1"]["ds1"], CustomDataset) - assert len(result["ws1"]["ds1"].custom_fields) == 2 - assert result["ws2"]["ds1"].custom_fields[0].cf_id == "cf3" - - -def test_duplicate_dataset_raises(valid_dataset_definitions): - """Test that duplicate dataset definitions raise a ValueError.""" - # Add a duplicate dataset definition - invalid = valid_dataset_definitions + [ - { - "workspace_id": "ws1", - "dataset_id": "ds1", - "dataset_name": "Dataset 1", - "dataset_datasource_id": "ds_source_1", - "dataset_source_table": "table1", - "dataset_source_sql": None, - "parent_dataset_reference": "parent1", - "parent_dataset_reference_attribute_id": "parent1.id", - "dataset_reference_source_column": "id", - "dataset_reference_source_column_data_type": "STRING", - "wdf_id": "wdf1", - "wdf_column_name": "id", - } - ] - validator = CustomFieldsDataValidator() - with pytest.raises(ValueError, match="Duplicate dataset definitions"): - validator.validate(invalid, []) - - -def test_duplicate_field_workspace_level(valid_dataset_definitions): - """Duplicate cf_id for ATTRIBUTE in same workspace. should raise ValueError.""" - fields = [ - { - "workspace_id": "ws1", - "dataset_id": "ds1", - "cf_id": "cf1", - "cf_type": "attribute", - "cf_name": "Field 1", - "cf_source_column": "col1", - "cf_source_column_data_type": "STRING", - }, - { - "workspace_id": "ws1", - "dataset_id": "ds2", - "cf_id": "cf1", - "cf_type": "attribute", - "cf_name": "Field 2", - "cf_source_column": "col2", - "cf_source_column_data_type": "STRING", - }, - ] - validator = CustomFieldsDataValidator() - with pytest.raises( - ValueError, - match="Duplicate custom field found for workspace ws1 with field ID cf1", - ): - validator.validate(valid_dataset_definitions, fields) - - -def test_duplicate_field_dataset_level(valid_dataset_definitions): - """Duplicate cf_id for DATE in same dataset. should raise ValueError.""" - fields = [ - { - "workspace_id": "ws1", - "dataset_id": "ds1", - "cf_id": "cf1", - "cf_type": "date", - "cf_name": "Field 1", - "cf_source_column": "col1", - "cf_source_column_data_type": "DATE", - }, - { - "workspace_id": "ws1", - "dataset_id": "ds1", - "cf_id": "cf1", - "cf_type": "date", - "cf_name": "Field 2", - "cf_source_column": "col2", - "cf_source_column_data_type": "DATE", - }, - ] - validator = CustomFieldsDataValidator() - with pytest.raises( - ValueError, - match="Duplicate custom field found for dataset ds1 with field ID cf1", - ): - validator.validate(valid_dataset_definitions, fields) - - -def test_invalid_data_structure(valid_dataset_definitions): - """Invalid shape of input data will raise ValidationError.""" - fields = [ - { - "workspace_id": "ws1", - "dataset_id": "ds1", - "cf_type": "attribute", - "cf_name": "Field 1", - } - ] - validator = CustomFieldsDataValidator() - with pytest.raises(ValidationError): - validator.validate(valid_dataset_definitions, fields) - - -def test_invalid_dataset_model(): - """Missing fields will raise ValidationError.""" - datasets = [{"workspace_id": "ws1", "name": "Dataset 1"}] - validator = CustomFieldsDataValidator() - with pytest.raises(ValidationError): - validator.validate(datasets, []) diff --git a/tests/test_custom_fields/test_models/__init__.py b/tests/test_custom_fields/test_models/__init__.py deleted file mode 100644 index 37d863d..0000000 --- a/tests/test_custom_fields/test_models/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# (C) 2025 GoodData Corporation diff --git a/tests/test_custom_fields/test_models/test_analytical_object.py b/tests/test_custom_fields/test_models/test_analytical_object.py deleted file mode 100644 index 730ced0..0000000 --- a/tests/test_custom_fields/test_models/test_analytical_object.py +++ /dev/null @@ -1,69 +0,0 @@ -# (C) 2025 GoodData Corporation -import os -import sys - -sys.path.insert( - 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../scripts")) -) - -import json - -import pytest -from custom_fields.models.analytical_object import ( # type: ignore[import] - AnalyticalObject, - AnalyticalObjects, -) - - -@pytest.mark.parametrize( - "file_path", - [ - "tests/data/custom_fields/response_get_all_metrics.json", - "tests/data/custom_fields/response_get_all_visualizations.json", - "tests/data/custom_fields/response_get_all_dashboards.json", - ], -) -def test_analytical_object_model_with_metrics(file_path): - with open(file_path, "r") as file: - data = json.load(file) - analytical_objects = AnalyticalObjects(**data) - assert isinstance(analytical_objects, AnalyticalObjects) - assert isinstance(analytical_objects.data, list) - assert all(isinstance(obj, AnalyticalObject) for obj in analytical_objects.data) - - -@pytest.mark.parametrize( - "response", - [ - { - "something": "unexpected", - }, - { - "data": [ - { - # "id": "metric1", # Missing id field - "type": "metric", - "attributes": { - "title": "Test Metric", - "areRelationsValid": True, - }, - } - ] - }, - { - "data": [ - { - "id": 123, # invalid id type - "type": "metric", - "attributes": { - "title": "Test Metric", - "areRelationsValid": True, - }, - } - ] - }, - ], -) -def test_analytical_object_model_with_invalid_response(response): - with pytest.raises(ValueError): - AnalyticalObjects(**response) diff --git a/tests/test_custom_fields/test_models/test_custom_data_object.py b/tests/test_custom_fields/test_models/test_custom_data_object.py deleted file mode 100644 index a4c6625..0000000 --- a/tests/test_custom_fields/test_models/test_custom_data_object.py +++ /dev/null @@ -1,107 +0,0 @@ -# (C) 2025 GoodData Corporation -import os -import sys - -sys.path.insert( - 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../scripts")) -) - -import pytest -from pydantic import ValidationError - -from scripts.custom_fields.models.custom_data_object import ( - ColumnDataType, - CustomDataset, - CustomDatasetDefinition, - CustomFieldDefinition, - CustomFieldType, -) - - -def make_valid_field_def(**kwargs): - data = { - "workspace_id": "ws1", - "dataset_id": "ds1", - "cf_id": "cf1", - "cf_name": "Custom Field", - "cf_type": CustomFieldType.ATTRIBUTE, - "cf_source_column": "col1", - "cf_source_column_data_type": ColumnDataType.STRING, - } - data.update(kwargs) - return data - - -def make_valid_dataset_def(**kwargs): - data = { - "workspace_id": "ws1", - "dataset_id": "ds1", - "dataset_name": "Dataset", - "dataset_datasource_id": "dsrc1", - "dataset_source_table": "table1", - "dataset_source_sql": None, - "parent_dataset_reference": "parent_ds", - "parent_dataset_reference_attribute_id": "parent_attr", - "dataset_reference_source_column": "src_col", - "dataset_reference_source_column_data_type": ColumnDataType.STRING, - "wdf_id": "wdf1", - "wdf_column_name": "col1", - } - data.update(kwargs) - return data - - -def test_custom_field_definition_valid(): - field = CustomFieldDefinition(**make_valid_field_def()) - assert field.cf_id == "cf1" - assert field.cf_type == CustomFieldType.ATTRIBUTE - - -def test_custom_field_definition_cf_id_equals_dataset_id_raises(): - data = make_valid_field_def(cf_id="ds1") - with pytest.raises(ValidationError) as exc: - CustomFieldDefinition(**data) - assert "cannot be the same as dataset ID" in str(exc.value) - - -def test_custom_dataset_definition_valid_table(): - ds = CustomDatasetDefinition(**make_valid_dataset_def()) - assert ds.dataset_source_table == "table1" - assert ds.dataset_source_sql is None - - -def test_custom_dataset_definition_valid_sql(): - data = make_valid_dataset_def( - dataset_source_table=None, dataset_source_sql="SELECT 1" - ) - ds = CustomDatasetDefinition(**data) - assert ds.dataset_source_sql == "SELECT 1" - assert ds.dataset_source_table is None - - -def test_custom_dataset_definition_both_none_raises(): - data = make_valid_dataset_def(dataset_source_table=None, dataset_source_sql=None) - with pytest.raises(ValidationError) as exc: - CustomDatasetDefinition(**data) - assert "must be provided" in str(exc.value) - - -def test_custom_dataset_definition_both_provided_raises(): - data = make_valid_dataset_def( - dataset_source_table="table1", dataset_source_sql="SELECT 1" - ) - with pytest.raises(ValidationError) as exc: - CustomDatasetDefinition(**data) - assert ( - "Only one of dataset_source_table and dataset_source_sql can be provided" - in str(exc.value) - ) - - -def test_custom_dataset_model(): - ds_def = CustomDatasetDefinition(**make_valid_dataset_def()) - field_def = CustomFieldDefinition(**make_valid_field_def()) - dataset = CustomDataset(definition=ds_def, custom_fields=[field_def]) - assert dataset.definition.dataset_id == "ds1" - assert len(dataset.custom_fields) == 1 - assert dataset.custom_fields[0].cf_id == "cf1"