From 76007074b90658cfb7ab53c28ad249f6c0f2358a Mon Sep 17 00:00:00 2001 From: Ulada Zakharava Date: Mon, 25 May 2026 14:26:38 +0000 Subject: [PATCH] Add Google system test resource cleanup script --- docs/spelling_wordlist.txt | 1 + .../google/resources_cleanup/.airflowignore | 1 + .../google/resources_cleanup/.gitignore | 25 ++ .../system/google/resources_cleanup/README.md | 110 ++++++++ .../google/resources_cleanup/__init__.py | 16 ++ .../__init__.py | 17 ++ .../__main__.py | 132 +++++++++ .../commands/__init__.py | 17 ++ .../commands/cmd_delete.py | 207 ++++++++++++++ .../commands/cmd_list.py | 98 +++++++ .../commands/cmd_list_asset_types.py | 122 +++++++++ .../commands/cmd_tree.py | 191 +++++++++++++ .../constants.py | 92 +++++++ .../handlers/__init__.py | 47 ++++ .../handlers/_base.py | 86 ++++++ .../handlers/ai.py | 259 ++++++++++++++++++ .../handlers/bq.py | 62 +++++ .../handlers/composer.py | 45 +++ .../handlers/compute.py | 49 ++++ .../handlers/dataflow.py | 42 +++ .../handlers/dataform.py | 69 +++++ .../handlers/dataplex.py | 57 ++++ .../handlers/dataproc.py | 71 +++++ .../handlers/dataproc_metastore.py | 45 +++ .../handlers/dlp.py | 43 +++ .../handlers/kafka.py | 44 +++ .../handlers/kubernetes_engine.py | 34 +++ .../handlers/logging.py | 39 +++ .../handlers/ray.py | 60 ++++ .../handlers/spanner.py | 72 +++++ .../handlers/sqladmin.py | 52 ++++ .../handlers/storage.py | 44 +++ .../helpers.py | 216 +++++++++++++++ .../templates/__init__.py | 17 ++ .../templates/base.html | 145 ++++++++++ .../templates/macros.html | 37 +++ .../templates/resource_name.html | 29 ++ .../templates/resources.html | 29 ++ .../resources_cleanup/config/__init__.py | 16 ++ .../google/resources_cleanup/data/__init__.py | 16 ++ .../google/resources_cleanup/pyproject.toml | 76 +++++ .../resources_cleanup/tests/__init__.py | 17 ++ .../resources_cleanup/tests/conftest.py | 52 ++++ .../tests/test_cmd_delete.py | 148 ++++++++++ .../resources_cleanup/tests/test_cmd_list.py | 103 +++++++ .../resources_cleanup/tests/test_cmd_tree.py | 146 ++++++++++ .../providers/openlineage/hooks/__init__.py | 16 ++ 47 files changed, 3312 insertions(+) create mode 100644 providers/google/tests/system/google/resources_cleanup/.airflowignore create mode 100644 providers/google/tests/system/google/resources_cleanup/.gitignore create mode 100644 providers/google/tests/system/google/resources_cleanup/README.md create mode 100644 providers/google/tests/system/google/resources_cleanup/__init__.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/__init__.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/__main__.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/__init__.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_delete.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_list.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_list_asset_types.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_tree.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/constants.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/__init__.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/_base.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/ai.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/bq.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/composer.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/compute.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataflow.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataform.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataplex.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataproc.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataproc_metastore.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dlp.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/kafka.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/kubernetes_engine.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/logging.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/ray.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/spanner.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/sqladmin.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/storage.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/helpers.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/__init__.py create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/base.html create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/macros.html create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/resource_name.html create mode 100644 providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/resources.html create mode 100644 providers/google/tests/system/google/resources_cleanup/config/__init__.py create mode 100644 providers/google/tests/system/google/resources_cleanup/data/__init__.py create mode 100644 providers/google/tests/system/google/resources_cleanup/pyproject.toml create mode 100644 providers/google/tests/system/google/resources_cleanup/tests/__init__.py create mode 100644 providers/google/tests/system/google/resources_cleanup/tests/conftest.py create mode 100644 providers/google/tests/system/google/resources_cleanup/tests/test_cmd_delete.py create mode 100644 providers/google/tests/system/google/resources_cleanup/tests/test_cmd_list.py create mode 100644 providers/google/tests/system/google/resources_cleanup/tests/test_cmd_tree.py create mode 100644 providers/openlineage/src/airflow/providers/openlineage/hooks/__init__.py diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index a108634d229db..6161c72ed8abf 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1549,6 +1549,7 @@ Splunk Sql sql sqla +sqladmin Sqlalchemy sqlalchemy sqlglot diff --git a/providers/google/tests/system/google/resources_cleanup/.airflowignore b/providers/google/tests/system/google/resources_cleanup/.airflowignore new file mode 100644 index 0000000000000..72e8ffc0db8aa --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/.airflowignore @@ -0,0 +1 @@ +* diff --git a/providers/google/tests/system/google/resources_cleanup/.gitignore b/providers/google/tests/system/google/resources_cleanup/.gitignore new file mode 100644 index 0000000000000..9fe47b803c812 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/.gitignore @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +ress.egg-info/ +*.egg-info/ +*.iml +__pycache__/ +build/ +dist/ +output/ +resources/ diff --git a/providers/google/tests/system/google/resources_cleanup/README.md b/providers/google/tests/system/google/resources_cleanup/README.md new file mode 100644 index 0000000000000..beedded177a56 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/README.md @@ -0,0 +1,110 @@ + + +# Google system test resource cleanup + +This helper project manages resources in a GCP project used for Google provider +system tests. It lives under `providers/google/tests` as test tooling and is not +included in the Google provider package. + +Create and activate a virtual environment, then install this helper from this +directory: + +```shell +pip install -e . +``` + +Now you can use `airflow-google-system-test-cleanup --help` to see the available +commands. + +Here is a sample output: + +```shell +usage: airflow-google-system-test-cleanup [-h] [--config-path CONFIG_PATH] [--resources-file-path RESOURCES_FILE_PATH] {list,list-asset-types,tree,delete} ... + +CLI to manage resource for a GCP project + +positional arguments: + {list,list-asset-types,tree,delete} + list Retrieve the GCP resources for the given GCP project + list-asset-types List all the unique asset types hierarchically in the GCP project + tree Show the resources hierarchically as an HTML file + delete Delete the resources for the given GCP project + +options: + -h, --help show this help message and exit + --config-path CONFIG_PATH + Direct path to a project config JSON file + --resources-file-path RESOURCES_FILE_PATH + Direct path to the resources.json file +``` + +## Global Options + +- `--config-path`: Override the automatic lookup of the project configuration. + Defaults to `config/.json`. +- `--resources-file-path`: Override where the tool saves or loads resource + data. Defaults to `resources//resources.json`. + +## Example usages + +- To retrieve all resources for the project and sync with Cloud Asset + Inventory: `airflow-google-system-test-cleanup list --project-id --sync` + + +- To list all resources from an existing previously synced file: + `airflow-google-system-test-cleanup list --project-id ` + + +- To retrieve the specific asset type (e.g: `ai`) resources for the project: + `airflow-google-system-test-cleanup list --project-id --asset-type ` + +- To produce an HTML tree visualization using a specific config file: + `airflow-google-system-test-cleanup tree --project-id --config-path /path/to/my_config.json` + +- To list the all unique asset types in a hierarchical tree: + `airflow-google-system-test-cleanup list-asset-types --project-id ` + + > # you can pass `--asset-type` parameter to list only one type of assets. + +- To clean up resources: `airflow-google-system-test-cleanup delete --project-id ` + +- To clean up only resources that are old enough: + `airflow-google-system-test-cleanup delete --project-id --asset-type dataproc --min-age-days 3` + +- To skip a service group during deletion, for example Composer: + `airflow-google-system-test-cleanup delete --project-id --skip-asset-type composer` + +## Development & Testing + +To install the development dependencies and run tests: + +```shell +pip install -e ".[test]" +pytest +``` + +## Building and Distribution + +To build the project as a Python wheel for distribution: + +1. Install the build tool: `pip install build` +2. Generate the wheel package: `python -m build` + +This will create the `dist/` directory containing the wheel file. diff --git a/providers/google/tests/system/google/resources_cleanup/__init__.py b/providers/google/tests/system/google/resources_cleanup/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/__init__.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/__main__.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/__main__.py new file mode 100644 index 0000000000000..e5ce513fd026b --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/__main__.py @@ -0,0 +1,132 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import argparse +import asyncio +import time +from collections.abc import Callable + +from airflow_google_provider_resource_cleanup import constants as c +from airflow_google_provider_resource_cleanup.commands.cmd_delete import handle_delete +from airflow_google_provider_resource_cleanup.commands.cmd_list import handle_list +from airflow_google_provider_resource_cleanup.commands.cmd_list_asset_types import handle_list_asset_types +from airflow_google_provider_resource_cleanup.commands.cmd_tree import handler_tree + +HANDLERS = { + "list": handle_list, + "list-asset-types": handle_list_asset_types, + "tree": handler_tree, + "delete": handle_delete, +} + + +def _init_argparse() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="airflow-google-system-test-cleanup", + description="CLI to manage resources for a GCP project", + ) + + subparsers = parser.add_subparsers(dest="command", required=True) + + # Global options + parser.add_argument("--config-path", help="Direct path to a project config JSON file") + parser.add_argument("--resources-file-path", help="Direct path to the resources.json file") + + # command: list + parser_list = subparsers.add_parser( + "list", + help="Retrieve the GCP resources for the given GCP project", + ) + parser_list.add_argument("--project-id", help="", required=True) + parser_list.add_argument("--asset-type", help="", choices=c.ASSET_TYPE_OPTIONS) + parser_list.add_argument( + "--sync", + action="store_true", + default=False, + ) + + # command: list-asset-types + parser_list_asset_types = subparsers.add_parser( + "list-asset-types", + help="List all the unique asset types hierarchically in the GCP project", + ) + parser_list_asset_types.add_argument("--project-id", help="", required=True) + parser_list_asset_types.add_argument("--asset-type", help="", choices=c.ASSET_TYPE_OPTIONS) + + # command: tree + parser_tree = subparsers.add_parser( + "tree", + help="Show the resources hierarchically as an HTML file", + ) + parser_tree.add_argument("--project-id", help="", required=True) + parser_tree.add_argument("--asset-type", help="", choices=c.ASSET_TYPE_OPTIONS) + + # command: cleanup + parser_delete = subparsers.add_parser( + "delete", + help="Delete the resources for the given GCP project", + ) + parser_delete.add_argument("--project-id", help="", required=True) + parser_delete.add_argument("--asset-type", help="", choices=c.ASSET_TYPE_OPTIONS) + + parser_delete.add_argument( + "--min-age-days", + type=int, + help="Delete only resources created at least this many days ago", + ) + parser_delete.add_argument( + "--skip-asset-type", + action="append", + default=[], + choices=c.ASSET_TYPE_OPTIONS, + help="Asset type group to skip during deletion. Can be used multiple times.", + ) + + return parser + + +def main(): + _parser = _init_argparse() + _args: argparse.Namespace = _parser.parse_args() + handler: Callable[[argparse.ArgumentParser, argparse.Namespace], None] = HANDLERS[_args.command] + start_time = time.monotonic() + + try: + if asyncio.iscoroutinefunction(handler): + asyncio.run(handler(_parser, _args)) + else: + handler(_parser, _args) + except Exception as e: + print("-" * 10, "EXCEPTION DURING SCRIPT EXECUTION", "-" * 10) + print(e) + print("-" * 40) + + end_time = time.monotonic() + duration = end_time - start_time + hours, rem = divmod(duration, 3600) + minutes, seconds = divmod(rem, 60) + parts = [] + if hours > 1: + parts.append(f"{int(hours)} hours") + if minutes > 1: + parts.append(f"{int(minutes)} minutes") + parts.append(f"{seconds:.2f} seconds") + print("Script duration after parsing: ", " ".join(parts)) + + +if __name__ == "__main__": + main() diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/__init__.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_delete.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_delete.py new file mode 100644 index 0000000000000..5791eb5e55b0e --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_delete.py @@ -0,0 +1,207 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import argparse +import datetime +import sys +from collections import defaultdict +from collections.abc import Mapping, Sequence +from typing import Any + +import airflow_google_provider_resource_cleanup.constants as constants +from airflow_google_provider_resource_cleanup.handlers import DELETE_HANDLERS +from airflow_google_provider_resource_cleanup.helpers import ( + GCPProjectConfig, + check_white_list, + get_resources_file, + get_tp_asset_data_item, + load_json, + provide_tp_asset_resource_file, +) + +RESOURCE_CREATE_TIME_FIELDS = ("createTime", "creationTimestamp") + + +def _parse_datetime(value: str) -> datetime.datetime: + dt = datetime.datetime.fromisoformat(value) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=datetime.timezone.utc) + return dt.astimezone(datetime.timezone.utc) + + +def _parse_create_time(value: Any) -> datetime.datetime | None: + if not value: + return None + try: + return _parse_datetime(str(value)) + except ValueError: + return None + + +def _get_resource_create_time(resource: Mapping[str, Any]) -> datetime.datetime | None: + locations: list[Mapping[str, Any]] = [resource] + + # Some resources have the create time nested in the additionalAttributes field. + additional_attributes = resource.get("additionalAttributes", {}) + if isinstance(additional_attributes, Mapping): + locations.append(additional_attributes) + + for location in locations: + for field in RESOURCE_CREATE_TIME_FIELDS: + create_time = _parse_create_time(location.get(field)) + if create_time: + return create_time + + return None + + +def check_min_age( + resource: Mapping[str, Any], + min_age_days: int | None, + now: datetime.datetime | None = None, +) -> bool: + if min_age_days is None: + return True + + created_at = _get_resource_create_time(resource) + resource_name = resource.get("name", "N/a") + if created_at is None: + print( + f'The resource: "{resource_name}" does not have a known creation time. ' + f"Skipping because --min-age-days={min_age_days} is enabled." + ) + return False + + now = now or datetime.datetime.now(datetime.timezone.utc) + age = now - created_at + if age < datetime.timedelta(days=min_age_days): + print( + f'The resource: "{resource_name}" is younger than {min_age_days} day(s). ' + f"Created at: {created_at.isoformat()}. Skipping." + ) + return False + + return True + + +async def handle_asset_type( + asset_type: str, + resources: Sequence[Mapping[str, Any]], + cfg: GCPProjectConfig, + min_age_days: int | None = None, + check_white_list_fn=check_white_list, + delete_handlers=DELETE_HANDLERS, +): + safe_resources = [] + for resource in resources: + if not check_white_list_fn(resource, cfg): + continue + if not check_min_age(resource, min_age_days): + continue + safe_resources.append(resource) + + if not safe_resources: + print(f'No safe resources found for the asset type: "{asset_type}"! Passing...') + return + + if asset_type not in delete_handlers: + print(f'There is no delete handler implemented for the asset type: "{asset_type}"! Passing...') + return + + if asset_type not in constants.TENANT_PROJECT_ASSET_TYPES: + print(f'{len(resources)} resources found for the asset type: "{asset_type}"!. Deleting...') + else: + print(f"Tenant project resource clean up requested: '{asset_type}'. Deleting attempt...") + + handler = delete_handlers[asset_type]() + + await handler.handle(safe_resources) + + +async def handle_delete( + _: argparse.ArgumentParser, + args: argparse.Namespace, + gcp_project_config_cls=GCPProjectConfig, + provide_tp_asset_resource_file_fn=provide_tp_asset_resource_file, + get_resources_file_fn=get_resources_file, + load_json_fn=load_json, + get_tp_asset_data_item_fn=get_tp_asset_data_item, + handle_asset_type_fn=handle_asset_type, +): + asset_type = args.asset_type + project_id = args.project_id + min_age_days = args.min_age_days + skip_asset_types = args.skip_asset_type + + if min_age_days is not None and min_age_days < 0: + print("--min-age-days cannot be negative! Exiting...") + sys.exit(1) + + if asset_type in skip_asset_types: + print(f'The requested asset type "{asset_type}" is listed in --skip-asset-type. Passing...') + return + + cfg = gcp_project_config_cls.load_from_file(project_id, config_path=args.config_path) + + if asset_type in constants.TENANT_PROJECT_ASSET_TYPES: + provide_tp_asset_resource_file_fn( + project_id, + asset_type, + resource_file_path=args.resources_file_path, + config_path=args.config_path, + ) + + resource_file = get_resources_file_fn(project_id, asset_type, resource_file_path=args.resources_file_path) + if not resource_file.exists(): + print(f'The resources file cannot be found: "{resource_file.absolute()}"! Exiting...') + print('You may need to run "list" command first to retrieve the updated resources! Exiting...') + sys.exit(1) + print(f'The resources file loading: "{resource_file.absolute()}"...') + resources = load_json_fn(resource_file) + + categorized_resources = defaultdict(list) + + if asset_type not in constants.TENANT_PROJECT_ASSET_TYPES: + for _asset_type in constants.ASSET_TYPES: + _asset_type_pattern = constants.ASSET_TYPES[_asset_type] + _asset_type_value = ".".join(_asset_type_pattern.split(".")[:-1]) + for resource in resources: + if resource.get("assetType").startswith(_asset_type_value): + categorized_resources[_asset_type].append(resource) + else: + for _asset_type in constants.TENANT_PROJECT_ASSET_TYPES: + _asset_type_pattern = constants.TENANT_PROJECT_ASSET_TYPES[_asset_type] + _asset_type_value = ".".join(_asset_type_pattern.split(".")[:-1]) + + for resource in resources: + if resource.get("assetType").startswith(_asset_type_value): + categorized_resources[_asset_type].append(resource) + + if asset_type is not None: + resource_list = categorized_resources[asset_type] + await handle_asset_type_fn(asset_type, resource_list, cfg, min_age_days=min_age_days) + else: # clean all resources + # append tenant project resource, that was not listed in the project resources file + # tp_vertex_ai_raycluster + categorized_resources["tp_vertex_ai_raycluster"] = [ + get_tp_asset_data_item_fn(project_id, asset_type="tp_vertex_ai_raycluster") + ] + for asset_type, resource_list in categorized_resources.items(): + if asset_type in skip_asset_types: + print(f'The asset type "{asset_type}" is listed in --skip-asset-type. Passing...') + continue + await handle_asset_type_fn(asset_type, resource_list, cfg, min_age_days=min_age_days) diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_list.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_list.py new file mode 100644 index 0000000000000..9ad85abb6cda2 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_list.py @@ -0,0 +1,98 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import argparse +from pathlib import Path + +import airflow_google_provider_resource_cleanup.constants as c +from airflow_google_provider_resource_cleanup.helpers import ( + GCPProjectConfig, + check_white_list, + ensure_path, + get_resources_file, + init_directories, + load_json, + run_command, +) + + +def _sync_resources(system_tests_project, resource_file: Path, resource_type: str | None = None): + if resource_type is None: + cmd = ( + f"gcloud asset search-all-resources --scope projects/{system_tests_project} " + f'--format json > "{str(resource_file)}"' + ) + else: + asset_type = c.ASSET_TYPES[resource_type] + cmd = ( + f"gcloud asset search-all-resources --scope projects/{system_tests_project} " + f'--asset-types="{asset_type}" --format json > "{str(resource_file)}"' + ) + + run_command(cmd) + + +def _print_resources(resources_file, cfg: GCPProjectConfig): + resources = load_json(resources_file) + if not isinstance(resources, list): + raise TypeError(f'Expected resources file "{resources_file}" to contain a JSON list.') + + counter = 0 + ignored_counter = 0 + for resource in resources: + if not check_white_list(resource, cfg): + ignored_counter += 1 + continue + + counter += 1 + name = resource.get("name", "N/a") + asset_type = resource.get("assetType", "N/a") + location = resource.get("location", "N/a") + print(f"({name})", f"@{location}", asset_type, sep=" >--< ") + labels = resource.get("labels", {}) + if labels: + print("\tLabels:") + for label_key, label_value in labels.items(): + print(f'\t\t{label_key}="{label_value}"') + + print("-" * 120) + print("Resource Count :", counter) + print("Ignored Resource Count :", ignored_counter) + print("Total Count :", counter + ignored_counter) + + +def handle_list(_: argparse.ArgumentParser, args: argparse.Namespace): + init_directories() + project_id = args.project_id + asset_type = args.asset_type + sync = args.sync + + resource_file = get_resources_file(project_id, asset_type, resource_file_path=args.resources_file_path) + cfg = GCPProjectConfig.load_from_file(project_id, config_path=args.config_path) + + if sync: + ensure_path(resource_file.parent) + _sync_resources(project_id, resource_file, asset_type) + _print_resources(resource_file, cfg) + else: + try: + _print_resources(resource_file, cfg) + except FileNotFoundError: + print(f"Resource file not found: {resource_file}. Syncing resources...") + ensure_path(resource_file.parent) + _sync_resources(project_id, resource_file, asset_type) + _print_resources(resource_file, cfg) diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_list_asset_types.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_list_asset_types.py new file mode 100644 index 0000000000000..4ee53e7f0913e --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_list_asset_types.py @@ -0,0 +1,122 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import argparse +import sys +from pathlib import Path +from typing import Any + +from airflow_google_provider_resource_cleanup.helpers import get_resources_file, load_json + + +def __build_hierarchy(assets: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: + """ + Builds a hierarchical tree structure from a flat list of assets. + + Args: + assets: A list of dictionaries, where each dictionary represents an + asset with "assetType" and "parentAssetType" keys. + + Returns: + A dictionary representing the root(s) of the asset hierarchy. + """ + # A dictionary to hold all asset types as nodes in our tree + # The key is the assetType, and the value is the node object. + nodes: dict[str, dict[str, Any]] = {} + + # A set to keep track of which asset types are children. + # This helps us easily find the root node(s) later. + child_asset_types: set[str] = set() + + # First pass: Create a node for every unique asset type found. + for item in assets: + asset_type = item["assetType"] + parent_type = item.get("parentAssetType", "N/A") + child_asset_types.add(asset_type) + + # Ensure a node exists for both the child and parent + if asset_type not in nodes: + nodes[asset_type] = {"children": {}} + if parent_type not in nodes: + nodes[parent_type] = {"children": {}} + + # Second pass: Link the children to their parents. + for item in assets: + asset_type = item["assetType"] + parent_type = item.get("parentAssetType", "N/A") + + # Get the parent node and add the current asset as a child + parent_node = nodes[parent_type] + child_node = nodes[asset_type] + parent_node["children"][asset_type] = child_node + + # Third pass: Identify the root(s) of the tree. + # A root is any node that is never listed as a child. + hierarchy: dict[str, dict[str, Any]] = {} + for asset_type, node in nodes.items(): + if asset_type not in child_asset_types: + hierarchy[asset_type] = node + + return hierarchy + + +def _get_asset_hierarchy(resource_file: Path) -> dict[str, dict[str, Any]]: + resources = load_json(resource_file) + if not isinstance(resources, list): + raise TypeError(f'Expected resources file "{resource_file}" to contain a JSON list.') + return __build_hierarchy(resources) + + +def _print_tree(nodes: dict[str, dict[str, Any]], prefix: str = ""): + """ + Prints the hierarchical dictionary in a tree-like format. + + Args: + nodes: The dictionary representing the hierarchy to print. + prefix: The prefix string used for indentation and tree lines. + """ + # Get a list of the keys to know which one is the last + items = sorted(list(nodes.keys())) + for i, key in enumerate(items): + # Check if this is the last item in the list + is_last = i == (len(items) - 1) + + # Print the current node with appropriate connectors + print(prefix + ("└── " if is_last else "├── ") + key) + + # Prepare the prefix for the children + child_prefix = prefix + (" " if is_last else "│ ") + + # Recursively print the children + children = nodes[key].get("children", {}) + if children: + _print_tree(children, child_prefix) + + +def handle_list_asset_types(_: argparse.ArgumentParser, args: argparse.Namespace): + project_id = args.project_id + asset_type = args.asset_type + + resource_file = get_resources_file(project_id, asset_type, resource_file_path=args.resources_file_path) + + try: + asset_types = _get_asset_hierarchy(resource_file) + print("=" * 200) + _print_tree(asset_types) + except FileNotFoundError: + print(f'The resource file "{resource_file}" cannot be found! Exiting...') + sys.exit(1) diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_tree.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_tree.py new file mode 100644 index 0000000000000..bf29fea10e4f7 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/commands/cmd_tree.py @@ -0,0 +1,191 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import argparse +from collections.abc import Sequence +from pathlib import Path +from typing import Any, Dict, List, Optional + +from jinja2 import Environment, PackageLoader, Template + +import airflow_google_provider_resource_cleanup.constants as c +from airflow_google_provider_resource_cleanup.helpers import ( + GCPProjectConfig, + check_white_list, + get_resources_file, + init_directories, + load_json, + run_command, +) + +_env = Environment(loader=PackageLoader("airflow_google_provider_resource_cleanup", "templates")) + + +def _format_node_for_template(node_details: Dict[str, Any]) -> Dict[str, Any]: + """Recursively formats node data for the template, including sorting children.""" + data = node_details["data"] + formatted_node = { + "display_name": data.get("name", "N/A"), + "asset_type_short": data.get("type", "N/A").split("/")[-1], + "resource_id": data.get("id", "N/A"), + "location": data.get("location", "N/A"), + "is_protected": data.get("is_protected", False), + "children": [], + } + + # Sort children by display name and recurse + sorted_child_keys = sorted( + node_details["children"].keys(), + key=lambda k: node_details["children"][k]["data"].get("name", "N/A"), + ) + + for child_key in sorted_child_keys: + formatted_node["children"].append(_format_node_for_template(node_details["children"][child_key])) + + return formatted_node + + +def _build_resource_tree(resources_data: Sequence[Dict[str, Any]], cfg: GCPProjectConfig) -> List[Dict[str, Any]]: + """Builds a hierarchical tree of GCP resources sorted and formatted for templates.""" + all_nodes: dict[str, dict[str, Any]] = {} + all_resource_names: set[str] = set() + child_resource_names: set[str] = set() + + # Pass 1: Create nodes + for resource in resources_data: + resource_name = resource.get("name") + if not resource_name: + continue + + all_nodes[resource_name] = { + "data": { + "name": resource.get("displayName", resource_name.split("/")[-1]), + "type": resource.get("assetType", "N/a"), + "id": resource.get("project", resource_name.split("/")[-1]), + "location": resource.get("location", "N/a"), + "is_protected": not check_white_list(resource, cfg, silent=True), + }, + "children": {}, + } + all_resource_names.add(resource_name) + + if parent := resource.get("parentFullResourceName"): # noqa: F841 + child_resource_names.add(resource_name) + + # Pass 2: Link parents + for resource in resources_data: + res_name = resource.get("name") + parent_name = resource.get("parentFullResourceName") + + if not isinstance(res_name, str): + continue + + if parent_name and res_name in all_nodes: + if parent_name not in all_nodes: + # Placeholder for missing parent + parent_parts = parent_name.split("/") + parent_type = "Unknown" + if len(parent_parts) > 1: + parent_type = parent_parts[-2].capitalize() + resource_type = f"cloudresourcemanager.googleapis.com/{parent_type}" + + all_nodes[parent_name] = { + "data": { + "name": parent_name.split("/")[-1], + "type": resource_type, + "id": parent_name.split("/")[-1], + "location": "unk", + "is_protected": True, + }, + "children": {}, + } + all_resource_names.add(parent_name) + all_nodes[parent_name]["children"][res_name] = all_nodes[res_name] + + # Identify top-level and format + top_level_names = sorted( + all_resource_names - child_resource_names, + key=lambda name: all_nodes[name]["data"].get("name", "N/A"), + ) + + return [_format_node_for_template(all_nodes[name]) for name in top_level_names] + + +def print_tree(nodes_list: List[Dict[str, Any]], indent: int = 0) -> None: + if not nodes_list: + return + + for node in nodes_list: + indent_str = "\t" * indent + display_name = node["display_name"] + asset_type = node["asset_type_short"] + location = node["location"] + protected_str = " (Protected)" if node["is_protected"] else "" + + print(f"{indent_str}- {display_name} ({asset_type} @ {location}){protected_str}") + + if node["children"]: + print_tree(node["children"], indent + 1) + + +def _get_html_template(resource_type: Optional[str] = None) -> Template: + if resource_type is None: + template_file_name = "resources.html" + else: + template_file_name = "resource_name.html" + return _env.get_template(template_file_name) + + +def _get_html_file(system_tests_project: str, resource_type: Optional[str] = None) -> Path: + if resource_type is None: + html_file = c.OUTPUT_FOLDER / system_tests_project / "index.html" + else: + html_file = c.OUTPUT_FOLDER / system_tests_project / resource_type / "index.html" + + html_file.parent.mkdir(parents=True, exist_ok=True) + return html_file + + +def handler_tree(_: argparse.ArgumentParser, args: argparse.Namespace) -> None: + init_directories() + project_id = args.project_id + asset_type = args.asset_type + + resource_file = get_resources_file(project_id, asset_type, resource_file_path=args.resources_file_path) + if not resource_file.exists(): + print(f'The resource file "{resource_file}" cannot be found! Exiting...') + exit(1) + + cfg = GCPProjectConfig.load_from_file(project_id, config_path=args.config_path) + data = load_json(resource_file) + if not isinstance(data, list): + raise TypeError(f'Expected resources file "{resource_file}" to contain a JSON list.') + resource_tree = _build_resource_tree(data, cfg) + + html_template = _get_html_template(asset_type) + html_content = html_template.render( + project_name=project_id, + resource_type=asset_type if asset_type is not None else "Resource", + resource_tree=resource_tree, + ) + + html_file = _get_html_file(project_id, asset_type) + with html_file.open("w") as f: + f.write(html_content) + + print(f"The output file created: {str(html_file)}! Opening...") + run_command(f"open {html_file.absolute()}") diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/constants.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/constants.py new file mode 100644 index 0000000000000..475e66c60fd0b --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/constants.py @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from pathlib import Path + +TENANT_PROJECT_ASSET_TYPES = {"tp_vertex_ai_raycluster": "tp_vertex_ai_raycluster.*"} + +ASSET_TYPES = { + "ai": "aiplatform.googleapis.com.*", + "alloydb": "alloydb.googleapis.com.*", + "artifact": "artifactregistry.googleapis.com.*", + "batch": "batch.googleapis.com.*", + "bq": "bigquery.googleapis.com.*", + "bqtransfer": "bigquerydatatransfer.googleapis.com.*", + "bigtable": "bigtableadmin.googleapis.com.*", + "cloudbuild": "cloudbuild.googleapis.com.*", + "cloudtasks": "cloudtasks.googleapis.com.*", + "composer": "composer.googleapis.com.*", + "compute": "compute.googleapis.com.*", + "dataflow": "dataflow.googleapis.com.*", + "dataform": "dataform.googleapis.com.*", + "datafusion": "datafusion.googleapis.com.*", + "dataplex": "dataplex.googleapis.com.*", + "dataproc": "dataproc.googleapis.com.*", + "dataproc_metastore": "metastore.googleapis.com.*", + "dlp": "dlp.googleapis.com.*", + "firestore": "firestore.googleapis.com.*", + "gke": "container.googleapis.com.*", + "logging": "logging.googleapis.com.*", + "memcache": "memcache.googleapis.com.*", + "metastore": "metastore.googleapis.com.*", + "monitoring": "monitoring.googleapis.com.*", + "pubsub": "pubsub.googleapis.com.*", + "run": "run.googleapis.com.*", + "sqladmin": "sqladmin.googleapis.com.*", + "storage": "storage.googleapis.com.*", + "storagetransfer": "storagetransfer.googleapis.com.*", + "kafka": "managedkafka.googleapis.com.*", + "workflows": "workflows.googleapis.com.*", + "firebase": "firebaserules.googleapis.com.*", + "container": "container.googleapis.com.*", + "spanner": "spanner.googleapis.com.*", +} | TENANT_PROJECT_ASSET_TYPES + +ASSET_TYPE_OPTIONS = list(ASSET_TYPES.keys()) + +_CWD = Path.cwd() +RESOURCES_FOLDER = _CWD / "resources" +OUTPUT_FOLDER = _CWD / "output" +CONFIG_FOLDER = _CWD / "config" + +DO_NOT_DELETE_LABELS = { + "owner", + "do-not-delete", + "dont-delete", + "donotdelete", + "dontdelete", +} + +DO_NOT_DELETE_ASSET_TYPES = { + "cloudresourcemanager.googleapis.com/Organization", + "cloudresourcemanager.googleapis.com/Project", + "cloudbilling.googleapis.com/ProjectBillingInfo", + "iam.googleapis.com/ServiceAccount", + "iam.googleapis.com/ServiceAccountKey", + "compute.googleapis.com/Network", # avoid to delete networks + "compute.googleapis.com/Route", # avoid to delete auto peering routes + "osconfig.googleapis.com/OSPolicyAssignment", + "serviceusage.googleapis.com/Service", + "apikeys.googleapis.com/Key", + "secretmanager.googleapis.com/Secret", + "secretmanager.googleapis.com/SecretVersion", + "servicedirectory.googleapis.com/Namespace", + "servicenetworking.googleapis.com/Connection", + "serviceusage.googleapis.com/Service", + "apikeys.googleapis.com/Key", + "dataproc.googleapis.com/AutoscalingPolicy", +} diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/__init__.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/__init__.py new file mode 100644 index 0000000000000..4809c88ad1252 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/__init__.py @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Type + +from ._base import BaseDeleteHandler +from .ai import AIPlatformDeleteHandler +from .bq import BigQueryDeleteHandler +from .composer import ComposerDeleteHandler +from .dataform import DataformDeleteHandler +from .dataplex import DataplexDeleteHandler +from .dataproc import DataprocDeleteHandler +from .dlp import DLPDeleteHandler +from .logging import LoggingDeleteHandler +from .ray import RayClusterOnVertexAIDeleteHandler +from .sqladmin import CloudSQLDeleteHandler +from .storage import StorageDeleteHandler + +DELETE_HANDLERS: dict[str, Type[BaseDeleteHandler]] = { + "ai": AIPlatformDeleteHandler, + "bq": BigQueryDeleteHandler, + "bqtransfer": BigQueryDeleteHandler, + "composer": ComposerDeleteHandler, + "sqladmin": CloudSQLDeleteHandler, + "dataform": DataformDeleteHandler, + "dataplex": DataplexDeleteHandler, + "dataproc": DataprocDeleteHandler, + "dlp": DLPDeleteHandler, + "logging": LoggingDeleteHandler, + "storage": StorageDeleteHandler, + "storagetransfer": StorageDeleteHandler, + "tp_vertex_ai_raycluster": RayClusterOnVertexAIDeleteHandler, +} diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/_base.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/_base.py new file mode 100644 index 0000000000000..11ee81b53762f --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/_base.py @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import asyncio +import traceback +from collections import defaultdict +from collections.abc import Awaitable, Callable + + +class BaseDeleteHandler: + # The function signature for the DELETERS's value: def f(resource: dict, prefix: str) + DELETERS: dict[str, Callable[[dict, str], Awaitable[bool | None]]] = {} + DELETION_ORDER: list[str] = [] # list of asset types to delete by order + SEMAPHORE_COUNT: int = 10 + SLEEP_AFTER_EACH_REQUEST = 0.01 + + def __init__(self): + if not self.DELETERS or not self.DELETION_ORDER: + raise NotImplementedError("To be able to use this handler you need to define DELETERS and DELETION_ORDER!") + + def categorize_resources(self, resources): + categorized_resources = defaultdict(list) + for resource in resources: + asset_type = resource.get("assetType") + key = asset_type if asset_type in self.DELETERS else "__UNKNOWN__" + categorized_resources[key].append(resource) + + return categorized_resources + + async def delete_single_resource( + self, resource: dict, counter: int, total_count: int, semaphore: asyncio.Semaphore + ): + async with semaphore: + prefix = f"[{counter}/{total_count}] " + asset_type = resource.get("assetType") + if asset_type not in self.DELETERS: + print(f"{prefix}Unknown asset type:", asset_type) + else: + deleter = self.DELETERS[asset_type] + if await deleter(resource, prefix) is not False: + print(f"{prefix}Resource deleted!") + + await asyncio.sleep(self.SLEEP_AFTER_EACH_REQUEST) + + async def handle(self, resources: list[dict]): + categorized_resources = self.categorize_resources(resources) + + semaphore = asyncio.Semaphore(self.SEMAPHORE_COUNT) + + for asset_type in self.DELETION_ORDER: + if asset_type not in categorized_resources: + print(f'Unknown asset type: "{asset_type}" in DELETION_ORDER! Passing...') + continue + + resource_list = categorized_resources[asset_type] + + resource_count = len(resource_list) + print(f"{asset_type} => {resource_count} resource(s) found! Deleting...") + + results = await asyncio.gather( + *[ + self.delete_single_resource(resource, idx, resource_count, semaphore) + for idx, resource in enumerate(resource_list, 1) + ], + return_exceptions=True, + ) + + # Expose errors silenced by asyncio.gather() call + for result in results: + if isinstance(result, Exception): + print("Found an error in the handle deletion results: ") + traceback.print_exception(type(result), result, result.__traceback__) diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/ai.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/ai.py new file mode 100644 index 0000000000000..86b62ad657133 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/ai.py @@ -0,0 +1,259 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from urllib.parse import urlencode + +from google.cloud import aiplatform + +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import ( + get_aiplatform_client_options, + get_resource_path, + run_command_async, +) + +TIMEOUT = 600 + + +def aiplatform_client(project_id: str, location: str): + aiplatform.init(project=project_id, location=location) + return aiplatform + + +async def __delete_via_curl( + func_name: str, + asset_description: str, + resource: dict, + prefix: str, + params: dict | None = None, +) -> None: + + try: + client_options = get_aiplatform_client_options(resource) + name = get_resource_path(resource) + api_base = client_options["api_endpoint"] + q = f"?{urlencode(params)}" if params is not None else "" + url = f"https://{api_base}/v1/{name}{q}" + cmd = f""" + curl -X DELETE \ + -H "Authorization: Bearer $(gcloud auth print-access-token)" \ + "{url}" + """ + print(f"{prefix} [{func_name}] Long running operation:", name) + await run_command_async(cmd) + print(f"{prefix} [{func_name}] DONE:", name) + except Exception as e: + print(f"{prefix} Error while deleting the {asset_description.lower()}!", e) + + +async def __delete_via_client(func_name: str, asset_desc: str, deleter, resource: dict, prefix: str): + try: + name = get_resource_path(resource) + + operation = await deleter(name=name) + print(f"{prefix} [{func_name}] Long running operation:", operation.operation.name) + result = operation.result(timeout=TIMEOUT) + print(f"{prefix} [{func_name}] Response:", result) + except Exception as e: + print(f"{prefix} Error while deleting the {asset_desc.lower()}!", e) + + +async def _delete_model(resource: dict, prefix: str): + client_options = get_aiplatform_client_options(resource) + client = aiplatform.gapic.ModelServiceAsyncClient(client_options=client_options) + await __delete_via_client( + "_delete_model", + "model", + client.delete_model, + resource, + prefix, + ) + + +async def _delete_endpoint(resource: dict, prefix: str): + client_options = get_aiplatform_client_options(resource) + client = aiplatform.gapic.EndpointServiceAsyncClient(client_options=client_options) + await __delete_via_client( + "_delete_endpoint", + "endpoint", + client.delete_endpoint, + resource, + prefix, + ) + + +async def _delete_dataset(resource: dict, prefix: str): + client_options = get_aiplatform_client_options(resource) + client = aiplatform.gapic.DatasetServiceAsyncClient(client_options=client_options) + await __delete_via_client( + "_delete_dataset", + "dataset", + client.delete_dataset, + resource, + prefix, + ) + + +async def _delete_training_pipeline(resource: dict, prefix: str): + client_options = get_aiplatform_client_options(resource) + client = aiplatform.gapic.PipelineServiceAsyncClient(client_options=client_options) + await __delete_via_client( + "_delete_training_pipeline", + "training pipeline", + client.delete_training_pipeline, + resource, + prefix, + ) + + +async def _delete_pipeline_job(resource: dict, prefix: str): + """ + curl -X DELETE \ + -H "Authorization: Bearer $(gcloud auth print-access-token)" \ + "https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/pipelineJobs/PIPELINE_RUN_ID" + """ + await __delete_via_curl( + "_delete_pipeline_job", + "pipeline job", + resource, + prefix, + ) + + +async def _delete_tuning_job(resource: dict, prefix: str): + print( + f"{prefix} [_delete_tuning_job] TuningJob {get_resource_path(resource)} " + f"skipped because Google Cloud does not support deleting TuningJobs." + f"See https://cloud.google.com/vertex-ai/generative-ai/docs/models/gemini-supervised-tuning#quota" + ) + return False + + +async def _delete_batch_prediction_job(resource: dict, prefix: str): + client_options = get_aiplatform_client_options(resource) + client = aiplatform.gapic.JobServiceAsyncClient(client_options=client_options) + await __delete_via_client( + "_delete_batch_prediction_job", + "batch prediction job", + client.delete_batch_prediction_job, + resource, + prefix, + ) + + +async def _delete_hyperparameter_tuning_job(resource: dict, prefix: str): + client_options = get_aiplatform_client_options(resource) + client = aiplatform.gapic.JobServiceAsyncClient(client_options=client_options) + await __delete_via_client( + "_delete_hyperparameter_tuning_job", + "hyperparameter tuning job", + client.delete_hyperparameter_tuning_job, + resource, + prefix, + ) + + +async def _delete_feature_online_store(resource: dict, prefix: str): + """ + curl -X DELETE \ + -H "Authorization: Bearer $(gcloud auth print-access-token)" \ + "https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featureOnlineStores/FEATUREONLINESTORE_NAME?force=BOOLEAN" + """ + await __delete_via_curl( + "_delete_feature_online_store", "feature online store", resource, prefix, params={"force": "true"} + ) + + +async def _delete_cached_content(resource: dict, prefix: str): + await __delete_via_curl( + "_delete_cached_content", + "cached content", + resource, + prefix, + ) + + +async def _delete_tensorboard(resource: dict, prefix: str): + client_options = get_aiplatform_client_options(resource) + client = aiplatform.gapic.TensorboardServiceAsyncClient(client_options=client_options) + await __delete_via_client( + "_delete_tensorboard", + "tensorboard", + client.delete_tensorboard, + resource, + prefix, + ) + + +async def _delete_custom_job(resource: dict, prefix: str): + client_options = get_aiplatform_client_options(resource) + client = aiplatform.gapic.JobServiceAsyncClient(client_options=client_options) + await __delete_via_client( + "_delete_custom_job", + "custom job", + client.delete_custom_job, + resource, + prefix, + ) + + +async def _delete_metadata_store(resource: dict, prefix: str): + client_options = get_aiplatform_client_options(resource) + client = aiplatform.gapic.MetadataServiceAsyncClient(client_options=client_options) + await __delete_via_client( + "_delete_metadata_store", + "metadata store", + client.delete_metadata_store, + resource, + prefix, + ) + + +class AIPlatformDeleteHandler(BaseDeleteHandler): + DELETERS = { + "aiplatform.googleapis.com/Dataset": _delete_dataset, + "aiplatform.googleapis.com/BatchPredictionJob": _delete_batch_prediction_job, + "aiplatform.googleapis.com/CustomJob": _delete_custom_job, + "aiplatform.googleapis.com/HyperparameterTuningJob": _delete_hyperparameter_tuning_job, + "aiplatform.googleapis.com/Model": _delete_model, + "aiplatform.googleapis.com/TrainingPipeline": _delete_training_pipeline, + "aiplatform.googleapis.com/Endpoint": _delete_endpoint, + "aiplatform.googleapis.com/FeatureOnlineStore": _delete_feature_online_store, + "aiplatform.googleapis.com/CachedContent": _delete_cached_content, + "aiplatform.googleapis.com/PipelineJob": _delete_pipeline_job, + "aiplatform.googleapis.com/Tensorboard": _delete_tensorboard, + "aiplatform.googleapis.com/TuningJob": _delete_tuning_job, + "aiplatform.googleapis.com/MetadataStore": _delete_metadata_store, + } + + DELETION_ORDER = [ + "aiplatform.googleapis.com/Model", + "aiplatform.googleapis.com/Endpoint", + "aiplatform.googleapis.com/Dataset", + "aiplatform.googleapis.com/TrainingPipeline", + "aiplatform.googleapis.com/PipelineJob", + "aiplatform.googleapis.com/TuningJob", + "aiplatform.googleapis.com/BatchPredictionJob", + "aiplatform.googleapis.com/HyperparameterTuningJob", + "aiplatform.googleapis.com/FeatureOnlineStore", + "aiplatform.googleapis.com/CachedContent", + "aiplatform.googleapis.com/Tensorboard", + "aiplatform.googleapis.com/CustomJob", + "aiplatform.googleapis.com/MetadataStore", + ] + + SLEEP_AFTER_EACH_REQUEST = 0.01 # 3 diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/bq.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/bq.py new file mode 100644 index 0000000000000..e00bc994f7c49 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/bq.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import get_resource_path, run_command_async + + +async def _delete_transfer_config(resource: dict, log_prefix: str): + name = get_resource_path(resource) + cmd = f"bq rm -f --transfer_config {name}" + await run_command_async(cmd, log_prefix) + + +async def _delete_table(resource: dict, log_prefix: str): + name = get_resource_path(resource) + _, project_id, _, dataset, _, table = name.split("/") + cmd = f"bq rm -f -t {project_id}:{dataset}.{table}" + await run_command_async(cmd, log_prefix) + + +async def _delete_dataset(resource: dict, log_prefix: str): + name = get_resource_path(resource) + _, project_id, _, dataset = name.split("/") + cmd = f"bq rm -r -f -d {project_id}:{dataset}" + await run_command_async(cmd, log_prefix) + + +async def _delete_model(resource: dict, log_prefix: str): + name = get_resource_path(resource) + _, project_id, _, dataset, _, model = name.split("/") + cmd = f"bq rm -f --model {project_id}:{dataset}.{model}" + await run_command_async(cmd, log_prefix) + + +class BigQueryDeleteHandler(BaseDeleteHandler): + DELETERS = { + "bigquery.googleapis.com/Dataset": _delete_dataset, + "bigquery.googleapis.com/Model": _delete_model, + "bigquery.googleapis.com/Table": _delete_table, + "bigquerydatatransfer.googleapis.com/TransferConfig": _delete_transfer_config, + } + + DELETION_ORDER = [ + "bigquerydatatransfer.googleapis.com/TransferConfig", + "bigquery.googleapis.com/Table", + "bigquery.googleapis.com/Model", + "bigquery.googleapis.com/Dataset", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/composer.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/composer.py new file mode 100644 index 0000000000000..e848566677520 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/composer.py @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import datetime + +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import get_resource_path, run_command_async + +DAYS_PROTECTED = 2 # days of protection for composer env. +# e.g. if composer env was created more than 2 days before now it will be deleted else skipped + + +async def _delete_composer_environment(resource: dict, log_prefix: str): + name = get_resource_path(resource) + create_time = datetime.datetime.fromisoformat(resource["createTime"]) + age = datetime.datetime.now(datetime.timezone.utc) - create_time + if not age > datetime.timedelta(days=DAYS_PROTECTED): + print(f"Composer env with name: {name} was skipped because it is protected for {DAYS_PROTECTED} days.") + return False + cmd = f"gcloud composer environments delete {name} --location={resource['location']} --quiet" + await run_command_async(cmd, log_prefix) + + +class ComposerDeleteHandler(BaseDeleteHandler): + DELETERS = { + "composer.googleapis.com/Environment": _delete_composer_environment, + } + + DELETION_ORDER = [ + "composer.googleapis.com/Environment", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/compute.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/compute.py new file mode 100644 index 0000000000000..fc23f56634e6d --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/compute.py @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import get_resource_path, run_command_async + + +async def delete_instance_group_manager(resource): + name = get_resource_path(resource) + cmd = f"gcloud compute instance-groups managed delete {name}" + await run_command_async(cmd) + + +async def delete_disk(resource): + name = get_resource_path(resource) + if resource.get("additionalAttributes") and resource["additionalAttributes"].get("users"): + print( + f"compute.googleapis.com/Disk with name: {name} was skipped " + f"because it is still used by: {resource['additionalAttributes']['users']}" + ) + return False + cmd = f"gcloud compute disks delete {name} --zone={resource['location']} --quiet" + await run_command_async(cmd) + + +class ComputeDeleteHandler(BaseDeleteHandler): + DELETERS = { + "compute.googleapis.com/InstanceGroupManager": delete_instance_group_manager, + "compute.googleapis.com/Disk": delete_disk, + } + + DELETION_ORDER = [ + "compute.googleapis.com/InstanceGroupManager", + "compute.googleapis.com/Disk", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataflow.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataflow.py new file mode 100644 index 0000000000000..9586611a927bb --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataflow.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import get_resource_path, run_command_async + +FINISHED_JOB_STATUSES = ["JOB_STATE_CANCELLED", "JOB_STATE_DRAINED", "JOB_STATE_DONE", "JOB_STATE_FAILED"] + + +async def _delete_dataflow_job(resource: dict, log_prefix: str): + state = resource.get("state") + location = resource.get("location") + job_id = get_resource_path(resource) + cmd_archive = f"gcloud dataflow jobs archive {job_id} --region={location} --quiet" + if state in FINISHED_JOB_STATUSES: + await run_command_async(cmd_archive, log_prefix) + return + cmd_delete = f"gcloud dataflow jobs cancel {job_id} --region={location} --force --quiet" + full_cmd = cmd_delete + " && " + cmd_archive + await run_command_async(full_cmd, log_prefix) + + +class DataflowDeleteHandler(BaseDeleteHandler): + DELETERS = {"dataflow.googleapis.com/Job": _delete_dataflow_job} + + DELETION_ORDER = [ + "dataflow.googleapis.com/Job", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataform.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataform.py new file mode 100644 index 0000000000000..27bcc6c28011b --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataform.py @@ -0,0 +1,69 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import curl, get_resource_path + +API_BASE = "https://dataform.googleapis.com/v1/" + + +async def _delete_dataform_workflow_invocation(resource: dict, log_prefix: str): + name = get_resource_path(resource) + url_delete = f"{API_BASE}{name}" + await curl(url_delete, log_prefix) + + +async def _delete_dataform_workspace(resource: dict, log_prefix: str): + name = get_resource_path(resource) + url_delete = f"{API_BASE}{name}" + await curl(url_delete, log_prefix) + + +async def _delete_dataform_workflow_config(resource: dict, log_prefix: str): + name = get_resource_path(resource) + url_delete = f"{API_BASE}{name}" + await curl(url_delete, log_prefix) + + +async def _delete_dataform_release_config(resource: dict, log_prefix: str): + name = get_resource_path(resource) + url_delete = f"{API_BASE}{name}" + await curl(url_delete, log_prefix) + + +async def _delete_dataform_repository(resource: dict, log_prefix: str): + name = get_resource_path(resource) + url_delete = f"{API_BASE}{name}" + await curl(url_delete, log_prefix) + + +class DataformDeleteHandler(BaseDeleteHandler): + DELETERS = { + "dataform.googleapis.com/Workspace": _delete_dataform_workspace, + "dataform.googleapis.com/WorkflowInvocation": _delete_dataform_workflow_invocation, + "dataform.googleapis.com/WorkflowConfig": _delete_dataform_workflow_config, + "dataform.googleapis.com/ReleaseConfig": _delete_dataform_release_config, + "dataform.googleapis.com/Repository": _delete_dataform_repository, + } + + DELETION_ORDER = [ + "dataform.googleapis.com/WorkflowInvocation", + "dataform.googleapis.com/Workspace", + "dataform.googleapis.com/ReleaseConfig", + "dataform.googleapis.com/WorkflowConfig", + "dataform.googleapis.com/Repository", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataplex.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataplex.py new file mode 100644 index 0000000000000..71ae2319be5e5 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataplex.py @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import get_resource_path, run_command_async + + +async def _delete_entry_group(resource: dict, log_prefix: str): + location = resource.get("location") + path = get_resource_path(resource) + _, project_id, _, _, _, name = path.split("/") + cmd = f"gcloud dataplex entry-groups delete {name} --location={location} --project={project_id}" + await run_command_async(cmd, log_prefix) + + +async def _delete_lake(resource: dict, log_prefix: str): + location = resource.get("location") + path = get_resource_path(resource) + _, project_id, _, _, _, lake = path.split("/") + cmd = f"gcloud dataplex lakes delete {lake} --location={location}" + await run_command_async(cmd, log_prefix) + + +async def _delete_task(resource: dict, log_prefix: str): + location = resource.get("location") + path = get_resource_path(resource) + _, project_id, _, _, _, lake, _, task = path.split("/") + cmd = f"gcloud dataplex tasks delete {task} --location={location} --lake={lake}" + await run_command_async(cmd, log_prefix) + + +class DataplexDeleteHandler(BaseDeleteHandler): + DELETERS = { + "dataplex.googleapis.com/EntryGroup": _delete_entry_group, + "dataplex.googleapis.com/Lake": _delete_lake, + "dataplex.googleapis.com/Task": _delete_task, + } + + DELETION_ORDER = [ + "dataplex.googleapis.com/EntryGroup", + "dataplex.googleapis.com/Task", + "dataplex.googleapis.com/Lake", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataproc.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataproc.py new file mode 100644 index 0000000000000..4ba94669a5f81 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataproc.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import run_command_async + + +def __extract_params(resource: dict) -> tuple[str, str]: + """ + Extract the location and name of the resource. + """ + region = resource.get("location") + if not isinstance(region, str): + raise TypeError("Dataproc resource location must be a string.") + name = resource.get("name", "").split("/")[-1] + return name, region + + +async def __run_delete_cmd(asset_type: str, resource: dict, prefix: str): + """ + Create and run the command for the given asset_type and resource information + """ + name, region = __extract_params(resource) + cmd = f"gcloud dataproc {asset_type} delete {name} --quiet --region={region}" + await run_command_async(cmd, log_prefix=prefix) + + +async def _delete_job(resource: dict, prefix: str): + await __run_delete_cmd("jobs", resource, prefix) + + +async def _delete_batch(resource: dict, prefix: str): + await __run_delete_cmd("batches", resource, prefix) + + +async def _delete_cluster(resource: dict, prefix: str): + await __run_delete_cmd("clusters", resource, prefix) + + +async def _delete_workflow_template(resource: dict, prefix: str): + await __run_delete_cmd("workflow-templates", resource, prefix) + + +class DataprocDeleteHandler(BaseDeleteHandler): + DELETERS = { + "dataproc.googleapis.com/Job": _delete_job, + "dataproc.googleapis.com/Batch": _delete_batch, + "dataproc.googleapis.com/Cluster": _delete_cluster, + "dataproc.googleapis.com/WorkflowTemplate": _delete_workflow_template, + } + + DELETION_ORDER = [ + "dataproc.googleapis.com/Job", + "dataproc.googleapis.com/Batch", + "dataproc.googleapis.com/Cluster", + "dataproc.googleapis.com/WorkflowTemplate", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataproc_metastore.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataproc_metastore.py new file mode 100644 index 0000000000000..3d5892890267c --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dataproc_metastore.py @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import get_resource_path, run_command_async + + +async def _delete_dataproc_metastore_service(resource: dict, log_prefix: str): + location = resource.get("location") + name = get_resource_path(resource) + cmd = f"gcloud metastore services delete {name} --location={location} --quiet" + await run_command_async(cmd, log_prefix) + + +async def _delete_dataproc_metastore_backup(resource: dict, log_prefix: str): + location = resource.get("location") + name = get_resource_path(resource) + cmd = f"gcloud metastore services delete {name} --location={location} --quiet" + await run_command_async(cmd, log_prefix) + + +class DataprocMetastoreDeleteHandler(BaseDeleteHandler): + DELETERS = { + "metastore.googleapis.com/Service": _delete_dataproc_metastore_service, + "metastore.googleapis.com/Backup": _delete_dataproc_metastore_backup, + } + + DELETION_ORDER = [ + "metastore.googleapis.com/Backup", + "metastore.googleapis.com/Service", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dlp.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dlp.py new file mode 100644 index 0000000000000..bd1d1dd628d3d --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/dlp.py @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import get_resource_path, run_command_async + + +async def _delete_dlp_job(resource: dict, log_prefix: str): + job_id = get_resource_path(resource) + cmd_delete = f"gcloud alpha dlp jobs delete {job_id.split('/')[-1]} --quiet" + await run_command_async(cmd_delete, log_prefix) + + +async def _delete_dlp_job_trigger(resource: dict, log_prefix: str): + job_id = get_resource_path(resource) + cmd_delete = f"gcloud alpha dlp job-triggers delete {job_id.split('/')[-1]} --quiet" + await run_command_async(cmd_delete, log_prefix) + + +class DLPDeleteHandler(BaseDeleteHandler): + DELETERS = { + "dlp.googleapis.com/DlpJob": _delete_dlp_job, + "dlp.googleapis.com/JobTrigger": _delete_dlp_job_trigger, + } + + DELETION_ORDER = [ + "dlp.googleapis.com/DlpJob", + "dlp.googleapis.com/JobTrigger", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/kafka.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/kafka.py new file mode 100644 index 0000000000000..120948020dafd --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/kafka.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import get_resource_path, run_command_async + + +def __extract_params(resource: dict) -> tuple[str, str]: + """ + Extract the location and name of the resource. + """ + region = resource.get("location") + if not isinstance(region, str): + raise TypeError("Managed Kafka resource location must be a string.") + name = get_resource_path(resource) + return name, region + + +async def _delete_managed_kafka_cluster(resource: dict, log_prefix: str): + name, location = __extract_params(resource) + cmd = f"gcloud managed-kafka clusters delete {name} --location={location} --quiet" + await run_command_async(cmd, log_prefix) + + +class ManagedKafkaDeleteHandler(BaseDeleteHandler): + DELETERS = {"managedkafka.googleapis.com/Cluster": _delete_managed_kafka_cluster} + + DELETION_ORDER = [ + "managedkafka.googleapis.com/Cluster", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/kubernetes_engine.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/kubernetes_engine.py new file mode 100644 index 0000000000000..62dcb12d97684 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/kubernetes_engine.py @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import get_resource_path, run_command_async + + +async def _delete_kubernetes_engine_custer(resource: dict, log_prefix: str): + location = resource.get("location") + name = get_resource_path(resource) + cmd = f"gcloud container clusters delete {name} --location={location} --quiet" + await run_command_async(cmd, log_prefix) + + +class GKEDeleteHandler(BaseDeleteHandler): + DELETERS = {"container.googleapis.com/Cluster": _delete_kubernetes_engine_custer} + + DELETION_ORDER = [ + "container.googleapis.com/Cluster", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/logging.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/logging.py new file mode 100644 index 0000000000000..9d1d802f2407d --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/logging.py @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import get_resource_path, run_command_async + +PROTECTED_NAMES = ["_Default", "_Required"] + + +async def _delete_log_sink(resource: dict, log_prefix: str): + name = get_resource_path(resource) + if resource["displayName"] in PROTECTED_NAMES: + return + cmd = f"gcloud logging sinks delete {name} --quiet" + await run_command_async(cmd, log_prefix) + + +class LoggingDeleteHandler(BaseDeleteHandler): + DELETERS = { + "logging.googleapis.com/LogSink": _delete_log_sink, + } + + DELETION_ORDER = [ + "logging.googleapis.com/LogSink", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/ray.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/ray.py new file mode 100644 index 0000000000000..fd9233ceee2a8 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/ray.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import asyncio + +import vertex_ray + +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.handlers.ai import aiplatform_client + +sdk_lock = asyncio.Lock() + + +async def delete_ray_cluster(resource, _): + """Handler to delete all managed Ray clusters on Vertex AI.""" + # We dump outer prefix to the "_"variable, as we have to use the + # custom one inside deleter function. + + async with sdk_lock: + return await asyncio.to_thread(_sync_delete_ray_on_vertexai_clusters, resource) + + +def _sync_delete_ray_on_vertexai_clusters(resource): + aiplatform_client(resource["project"], resource["location"]) + clusters = vertex_ray.list_ray_clusters() + total_count = len(clusters) + + if total_count == 0: + print(f"No Ray clusters found for {resource['project']} in {resource['location']}.") + return False + + for counter, cluster in enumerate(clusters, 1): + prefix = f"[{counter}/{total_count}]" + try: + vertex_ray.delete_ray_cluster(cluster.cluster_resource_name) + print(f"{prefix} Deleted: {cluster.cluster_resource_name}") + except Exception as e: + print(f"Failed: {cluster.cluster_resource_name} - {e}") + + return True + + +class RayClusterOnVertexAIDeleteHandler(BaseDeleteHandler): + DELETERS = {"tp_vertex_ai_raycluster": delete_ray_cluster} + + DELETION_ORDER = ["tp_vertex_ai_raycluster"] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/spanner.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/spanner.py new file mode 100644 index 0000000000000..c6fc1e751bc91 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/spanner.py @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import get_resource_path, run_command_async + + +async def _delete_database(resource: dict, log_prefix: str): + path = get_resource_path(resource) + instance, database = path.split("/") # TODO implement + cmd = f"gcloud spanner databases delete {database} --instance={instance}" + await run_command_async(cmd, log_prefix) + + +async def _delete_backup(resource: dict, log_prefix: str): + path = get_resource_path(resource) + instance, backup = path.split("/") # TODO implement + cmd = f"gcloud spanner backups delete {backup} --instance={instance}" + await run_command_async(cmd, log_prefix) + + +async def _delete_instance(resource: dict, log_prefix: str): + path = get_resource_path(resource) + instance = path.split("/") # TODO implement + cmd = f"gcloud spanner instances delete {instance}" + await run_command_async(cmd, log_prefix) + + +async def _delete_instance_config(resource: dict, log_prefix: str): + path = get_resource_path(resource) + instance_config = path.split("/") # TODO implement + cmd = f"gcloud spanner instance-configs delete {instance_config}" + await run_command_async(cmd, log_prefix) + + +async def _delete_instance_partition(resource: dict, log_prefix: str): + path = get_resource_path(resource) + instance, instance_partition = path.split("/") # TODO implement + cmd = f"gcloud spanner instance-partitions delete {instance_partition} --instance={instance}" + await run_command_async(cmd, log_prefix) + + +class SpannerDeleteHandler(BaseDeleteHandler): + DELETERS = { + "spanner.googleapis.com/Database": _delete_database, + "spanner.googleapis.com/Backup": _delete_backup, + "spanner.googleapis.com/Instance": _delete_instance, + "spanner.googleapis.com/InstanceConfig": _delete_instance_config, + "spanner.googleapis.com/InstancePartition": _delete_instance_partition, + } + + DELETION_ORDER = [ + "spanner.googleapis.com/Backup", + "spanner.googleapis.com/Database", + "spanner.googleapis.com/InstanceConfig", + "spanner.googleapis.com/InstancePartition", + "spanner.googleapis.com/Instance", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/sqladmin.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/sqladmin.py new file mode 100644 index 0000000000000..e4da6fb65adf5 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/sqladmin.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import curl, get_resource_path, run_command_async + + +async def _delete_instance(resource: dict, log_prefix: str): + name = get_resource_path(resource) + project_id = name.split("/")[1] + cmd = f"gcloud sql instances delete {name} --project={project_id} --quiet" + await run_command_async(cmd, log_prefix) + + +async def _delete_backup(resource: dict, log_prefix: str): + name = get_resource_path(resource) + project_id = name.split("/")[1] + cmd = f"gcloud sql backups delete {name} --project={project_id} --quiet" + await run_command_async(cmd, log_prefix) + + +async def _delete_backup_run(resource: dict, log_prefix: str): + name = get_resource_path(resource) + url = f"https://sqladmin.googleapis.com/sql/v1beta4/{name}" + await curl(url, log_prefix=log_prefix) + + +class CloudSQLDeleteHandler(BaseDeleteHandler): + DELETERS = { + "sqladmin.googleapis.com/Backup": _delete_backup, + "sqladmin.googleapis.com/Instance": _delete_instance, + "sqladmin.googleapis.com/BackupRun": _delete_backup_run, + } + DELETION_ORDER = [ + "sqladmin.googleapis.com/BackupRun", + "sqladmin.googleapis.com/Backup", + "sqladmin.googleapis.com/Instance", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/storage.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/storage.py new file mode 100644 index 0000000000000..104729a3637f1 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/handlers/storage.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow_google_provider_resource_cleanup.handlers._base import BaseDeleteHandler +from airflow_google_provider_resource_cleanup.helpers import get_resource_path, run_command_async + + +async def _delete_bucket(resource: dict, log_prefix: str): + name = get_resource_path(resource) + cmd = f"gcloud storage rm --recursive gs://{name}" + await run_command_async(cmd, log_prefix) + + +async def _delete_transfer_job(resource: dict, log_prefix: str): + name = get_resource_path(resource) + _, project_id, _, transfer_job_name = name.split("/") + cmd = f"gcloud transfer jobs delete {transfer_job_name} --project {project_id}" + await run_command_async(cmd, log_prefix) + + +class StorageDeleteHandler(BaseDeleteHandler): + DELETERS = { + "storage.googleapis.com/Bucket": _delete_bucket, + "storagetransfer.googleapis.com/TransferJob": _delete_transfer_job, + } + + DELETION_ORDER = [ + "storagetransfer.googleapis.com/TransferJob", + "storage.googleapis.com/Bucket", + ] diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/helpers.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/helpers.py new file mode 100644 index 0000000000000..5ce2b15450004 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/helpers.py @@ -0,0 +1,216 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import asyncio +import json +import os +from dataclasses import asdict, dataclass, field +from pathlib import Path +from typing import Any + +from . import constants as c + + +def run_command(cmd: str, log_prefix: str = ""): + print(f'{log_prefix}Executing the command: "{cmd}"...') + os.system(cmd) + + +async def run_command_async(cmd: str, log_prefix: str = ""): + print(f'{log_prefix}Executing the command: "{cmd}"...') + process = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await process.communicate() + print(f"{log_prefix}Command exited with code: {process.returncode}") + if stdout: + print(f"{log_prefix}Stdout:\n{stdout.decode().strip()}") + if stderr: + print(f"{log_prefix}Stderr:\n{stderr.decode().strip()}") + + +def get_resources_file(system_tests_project, resource_type: str | None = None, resource_file_path: Path | None = None): + if resource_file_path: + return Path(resource_file_path) + + if resource_type is None: + resources_file = c.RESOURCES_FOLDER / system_tests_project / "resources.json" + else: + resources_file = c.RESOURCES_FOLDER / system_tests_project / f"{resource_type}.json" + + return resources_file + + +def init_directories(): + directories = [c.RESOURCES_FOLDER, c.OUTPUT_FOLDER] + for directory in directories: + ensure_path(directory) + + +def ensure_path(directory: Path): + directory.mkdir(parents=True, exist_ok=True) + + +JsonData = dict[str, Any] | list[Any] + + +def load_json(file_path: Path) -> JsonData: + with file_path.open() as file: + return json.load(file) + + +def load_project_configuration(project_id) -> dict[str, Any]: + config_file = c.CONFIG_FOLDER / f"{project_id}.json" + data = load_json(config_file) + if not isinstance(data, dict): + raise TypeError(f'Expected project configuration "{config_file}" to be a JSON object.') + return data + + +def check_white_list(resource: dict, config: "GCPProjectConfig", silent: bool = False) -> bool: + asset_type = resource.get("assetType", "N/a") + + if asset_type in c.DO_NOT_DELETE_ASSET_TYPES: + if not silent: + print(f'The asset type: "{asset_type}" is protected. Ignoring!') + return False + + name = resource.get("name", "N/a") + + protected_resources = config.protected_resources.get(asset_type, []) + if name in protected_resources: + if not silent: + print(f'The resource: "{name}" is protected. Ignoring!') + return False + + labels = resource.get("labels", {}) + if labels: + for label_key, label_value in labels.items(): + if label_key in c.DO_NOT_DELETE_LABELS or label_value in c.DO_NOT_DELETE_LABELS: + if not silent: + print(f'The label: "{label_key}={label_value}" is protected for resource "{name}". Ignoring!') + return False + + return True + + +def get_aiplatform_client_options(resource: dict): + location = resource.get("location") + api_endpoint = f"{location}-aiplatform.googleapis.com" + client_options = {"api_endpoint": api_endpoint} + return client_options + + +def get_resource_path(resource: dict) -> str: + raw_name = resource.get("name") + if not isinstance(raw_name, str): + raise TypeError("Resource name must be a string.") + _, name = raw_name.split("googleapis.com/") + return name + + +def get_resource_name_for_compute(resource: dict) -> str: + resource_path = resource.get("name") + if not isinstance(resource_path, str): + raise TypeError("Resource name must be a string.") + return resource_path.replace("//compute.googleapis.com/", "") + + +async def curl(url, method="DELETE", log_prefix=""): + cmd = f""" + curl -X {method} \ + -H "Authorization: Bearer $(gcloud auth print-access-token)" \ + "{url}" + """ + try: + await run_command_async(cmd) + except Exception as e: + print(f'{log_prefix}Error while running the curl command: "{cmd}"!', e) + + +def dump_json(file_path: Path, data: JsonData) -> None: + with file_path.open("w") as file: + json.dump(data, file) + + +@dataclass +class GCPProjectConfig: + """ + JSON config representation for a GCP project. + + Example:: + + { + "project_id": "", + "protected_resources": { + "": ["", ""], + "storage.googleapis.com/Bucket": [ + "//storage.googleapis.com/providers-dashboard-temp-logs-dev" + ] + } + } + + :param project_id: The ID of the GCP project. + :param protected_resources: Asset types mapped to resource names that should not be deleted. + :param default_location: The default GCP location to use for resources. + """ + + project_id: str + protected_resources: dict[str, list[str]] = field(default_factory=dict) + default_location: str = "us-central1" + + @classmethod + def load_from_file(cls, project_id, config_path: Path | None = None): + file = Path(config_path) if config_path else c.CONFIG_FOLDER / f"{project_id}.json" + data = load_json(file) if file.exists() else {} + if not isinstance(data, dict): + raise TypeError(f'Expected project configuration "{file}" to be a JSON object.') + if "project_id" in data: + del data["project_id"] # override if the project_id defined in the json file + cfg = GCPProjectConfig(project_id=project_id, **data) + + # Only dump if we are using the default path or if we want to ensure the custom one exists + # To be safe, we always ensure parent exists and dump it. + ensure_path(file.parent) + dump_json(file, asdict(cfg)) + return cfg + + +def get_tp_asset_data_item(project_id, asset_type="tp_vertex_ai_raycluster", config_path: Path | None = None): + cfg = GCPProjectConfig.load_from_file(project_id, config_path=config_path) + return { + "assetType": asset_type, + "project": project_id, + "location": cfg.default_location, + } + + +def provide_tp_asset_resource_file( + project_id, + asset_type, + resource_file_path: Path | None = None, + config_path: Path | None = None, +): + resources_file = get_resources_file(project_id, asset_type, resource_file_path=resource_file_path) + ensure_path(resources_file.parent) + dump_json( + resources_file, + [get_tp_asset_data_item(project_id, asset_type, config_path=config_path)], + ) diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/__init__.py b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/base.html b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/base.html new file mode 100644 index 0000000000000..c84a9224af9f7 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/base.html @@ -0,0 +1,145 @@ + + + + + + + + GCP {{ resource_type | title }} Hierarchy for {{ project_name }} + + + +
+

GCP {{ resource_type | title }} Hierarchy

+
+ + +
+ {% block content %}{% endblock %} +
+ + + diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/macros.html b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/macros.html new file mode 100644 index 0000000000000..f997db8d29d41 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/macros.html @@ -0,0 +1,37 @@ + + +{% macro render_node(node, is_header=False) %} + {% if node.children %} +
+ {{ render_node_content(node, is_header) }} +
    + {% for child in node.children %} + {{ render_node(child) }} + {% endfor %} +
+
+ {% else %} +
  • {{ render_node_content(node, is_header) }}
  • + {% endif %} +{% endmacro %} + +{% macro render_node_content(node, is_header=False) %} + {{ node.display_name }} ({{ node.asset_type_short }} @{{ node.location }}{% if is_header %}, ID: {{ node.resource_id }}{% endif %}){% if node.is_protected %} (Protected){% endif %} +{% endmacro %} diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/resource_name.html b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/resource_name.html new file mode 100644 index 0000000000000..6b86617b2ac2e --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/resource_name.html @@ -0,0 +1,29 @@ + + +{% extends 'base.html' %} +{% from 'macros.html' import render_node %} + +{% block content %} +
      + {% for node in resource_tree %} + {{ render_node(node, is_header=True) }} + {% endfor %} +
    +{% endblock %} diff --git a/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/resources.html b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/resources.html new file mode 100644 index 0000000000000..6b86617b2ac2e --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/airflow_google_provider_resource_cleanup/templates/resources.html @@ -0,0 +1,29 @@ + + +{% extends 'base.html' %} +{% from 'macros.html' import render_node %} + +{% block content %} +
      + {% for node in resource_tree %} + {{ render_node(node, is_header=True) }} + {% endfor %} +
    +{% endblock %} diff --git a/providers/google/tests/system/google/resources_cleanup/config/__init__.py b/providers/google/tests/system/google/resources_cleanup/config/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/config/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/google/tests/system/google/resources_cleanup/data/__init__.py b/providers/google/tests/system/google/resources_cleanup/data/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/data/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/google/tests/system/google/resources_cleanup/pyproject.toml b/providers/google/tests/system/google/resources_cleanup/pyproject.toml new file mode 100644 index 0000000000000..71fcdbfe2cb57 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/pyproject.toml @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +[build-system] +requires = ["setuptools>=64.0.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "apache-airflow-google-provider-resource-cleanup" +version = "0.0.1" +description = "CLI tool to clean up resources created by Apache Airflow Google provider system tests" +requires-python = ">=3.11,<3.12" +dependencies = [ + "google-cloud-aiplatform[ray]", + "ray", + "jinja2", +] + +[project.scripts] +airflow-google-system-test-cleanup = "airflow_google_provider_resource_cleanup.__main__:main" + +[project.optional-dependencies] +test = [ + "pytest", + "pytest-mock", + "pytest-asyncio", +] +lint = [ + "ruff", +] + + +[tool.setuptools.packages.find] +include = ["airflow_google_provider_resource_cleanup*"] + +[tool.setuptools.package-data] +airflow_google_provider_resource_cleanup = ["templates/*.html"] + +[tool.pytest.ini_options] +pythonpath = ["."] +testpaths = ["tests"] +python_files = ["test_*.py"] +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" + +[tool.ruff] +line-length = 120 +target-version = "py311" + + +[tool.ruff.lint] +select = [ + "E", "W", # pycodestyle + "F", # Pyflakes + "I", # isort +] +ignore = [] + + +[tool.ruff.format] +indent-style = "space" +quote-style = "double" diff --git a/providers/google/tests/system/google/resources_cleanup/tests/__init__.py b/providers/google/tests/system/google/resources_cleanup/tests/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/tests/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/google/tests/system/google/resources_cleanup/tests/conftest.py b/providers/google/tests/system/google/resources_cleanup/tests/conftest.py new file mode 100644 index 0000000000000..4c90e4fcd09d4 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/tests/conftest.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import argparse +from unittest.mock import MagicMock, patch + +import pytest + +from airflow_google_provider_resource_cleanup import helpers + + +@pytest.fixture +def mock_args(): + """Default mock for argparse.Namespace.""" + return argparse.Namespace( + project_id="test-proj", + asset_type=None, + sync=False, + resources_file_path=None, + config_path=None, + min_age_days=None, + skip_asset_type=[], + ) + + +@pytest.fixture +def config_mock(): + """Default mock for GCPProjectConfig.""" + return MagicMock() + + +@pytest.fixture +def patch_common(): + """Common patches for many CLI commands.""" + with patch.object(helpers, "init_directories") as mock_init: + yield { + "init": mock_init, + } diff --git a/providers/google/tests/system/google/resources_cleanup/tests/test_cmd_delete.py b/providers/google/tests/system/google/resources_cleanup/tests/test_cmd_delete.py new file mode 100644 index 0000000000000..bcea21829320b --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/tests/test_cmd_delete.py @@ -0,0 +1,148 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import datetime +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from airflow_google_provider_resource_cleanup.commands.cmd_delete import check_min_age, handle_asset_type, handle_delete + + +async def test_handle_asset_type(): + asset_type = "ai" + mock_check_white_list = MagicMock() + mock_check_white_list.side_effect = [False, True] + resources = [{"name": "res1_protected"}, {"name": "res2_safe"}] + cfg = MagicMock() + + mock_handler_instance = MagicMock() + mock_handler_instance.handle = AsyncMock() + mock_handler_cls = MagicMock(return_value=mock_handler_instance) + mock_delete_handlers = {"ai": mock_handler_cls} + + await handle_asset_type( + asset_type, + resources, + cfg, + check_white_list_fn=mock_check_white_list, + delete_handlers=mock_delete_handlers, + ) + + mock_handler_instance.handle.assert_called_once() + called_args = mock_handler_instance.handle.call_args[0][0] + assert len(called_args) == 1 + assert called_args[0]["name"] == "res2_safe" + + +@pytest.fixture +def handle_delete_dependencies(): + mock_gcp_cfg_cls = MagicMock() + mock_get_file = MagicMock() + mock_load_json = MagicMock() + mock_handle_asset = AsyncMock() + mock_provide_tp = MagicMock() + mock_get_tp_item = MagicMock() + + mock_resource_file = MagicMock(spec=Path) + mock_resource_file.exists.return_value = True + mock_resource_file.absolute.return_value = "/fake/resources.json" + mock_get_file.return_value = mock_resource_file + mock_load_json.return_value = [{"name": "res1", "assetType": "compute.googleapis.com/Instance"}] + return { + "gcp_project_config_cls": mock_gcp_cfg_cls, + "provide_tp_asset_resource_file_fn": mock_provide_tp, + "get_resources_file_fn": mock_get_file, + "load_json_fn": mock_load_json, + "get_tp_asset_data_item_fn": mock_get_tp_item, + "handle_asset_type_fn": mock_handle_asset, + } + + +async def test_handle_delete_loads_config(mock_args, handle_delete_dependencies): + mock_args.asset_type = "compute" + await handle_delete(None, mock_args, **handle_delete_dependencies) + handle_delete_dependencies["gcp_project_config_cls"].load_from_file.assert_called_once_with( + "test-proj", config_path=None + ) + + +async def test_handle_delete_gets_resources_file(mock_args, handle_delete_dependencies): + mock_args.asset_type = "compute" + await handle_delete(None, mock_args, **handle_delete_dependencies) + handle_delete_dependencies["get_resources_file_fn"].assert_called_once_with( + "test-proj", "compute", resource_file_path=None + ) + + +async def test_handle_delete_loads_json(mock_args, handle_delete_dependencies): + mock_args.asset_type = "compute" + await handle_delete(None, mock_args, **handle_delete_dependencies) + handle_delete_dependencies["load_json_fn"].assert_called_once_with( + handle_delete_dependencies["get_resources_file_fn"].return_value + ) + + +async def test_handle_delete_calls_handle_asset_type(mock_args, handle_delete_dependencies): + mock_args.asset_type = "compute" + await handle_delete(None, mock_args, **handle_delete_dependencies) + handle_delete_dependencies["handle_asset_type_fn"].assert_called_once() + + +def test_check_min_age(): + now = datetime.datetime(2026, 5, 20, tzinfo=datetime.UTC) + + assert check_min_age({"name": "res1"}, None, now=now) + assert not check_min_age({"name": "res1"}, 3, now=now) + assert check_min_age({"name": "old", "createTime": "2026-05-16T00:00:00+00:00"}, 3, now=now) + assert not check_min_age({"name": "new", "createTime": "2026-05-19T00:00:00+00:00"}, 3, now=now) + + +async def test_handle_asset_type_filters_by_min_age(): + asset_type = "ai" + now = datetime.datetime.now(datetime.UTC) + resources = [ + {"name": "old", "createTime": (now - datetime.timedelta(days=4)).isoformat()}, + {"name": "new", "createTime": now.isoformat()}, + ] + cfg = MagicMock() + + mock_handler_instance = MagicMock() + mock_handler_instance.handle = AsyncMock() + mock_handler_cls = MagicMock(return_value=mock_handler_instance) + + await handle_asset_type( + asset_type, + resources, + cfg, + min_age_days=3, + check_white_list_fn=MagicMock(return_value=True), + delete_handlers={"ai": mock_handler_cls}, + ) + + mock_handler_instance.handle.assert_called_once_with([resources[0]]) + + +async def test_handle_delete_skips_requested_asset_type(mock_args, handle_delete_dependencies): + mock_args.asset_type = "composer" + mock_args.skip_asset_type = ["composer"] + + await handle_delete(None, mock_args, **handle_delete_dependencies) + + handle_delete_dependencies["get_resources_file_fn"].assert_not_called() + handle_delete_dependencies["handle_asset_type_fn"].assert_not_called() diff --git a/providers/google/tests/system/google/resources_cleanup/tests/test_cmd_list.py b/providers/google/tests/system/google/resources_cleanup/tests/test_cmd_list.py new file mode 100644 index 0000000000000..cb50cb94e13e6 --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/tests/test_cmd_list.py @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from airflow_google_provider_resource_cleanup.commands import cmd_list +from airflow_google_provider_resource_cleanup.commands.cmd_list import ( + GCPProjectConfig, + _print_resources, + _sync_resources, + handle_list, +) + + +@pytest.mark.parametrize( + "asset_type, expected_asset_arg", + [ + (None, ""), + ("compute", ' --asset-types="compute.googleapis.com.*"'), + ], +) +def test_sync_resources(asset_type, expected_asset_arg): + with patch.object(cmd_list, "run_command") as mock_run: + resource_file = Path("/tmp/res.json") + _sync_resources("test-project", resource_file, asset_type) + + expected_cmd = ( + "gcloud asset search-all-resources --scope projects/test-project" + f"{expected_asset_arg} --format json >" + f' "{str(resource_file)}"' + ) + mock_run.assert_called_once_with(expected_cmd) + + +def test_print_resources(capsys, config_mock): + with ( + patch.object(cmd_list, "load_json") as mock_load, + patch.object(cmd_list, "check_white_list") as mock_check, + ): + mock_load.return_value = [ + { + "name": "res1_protected", + "assetType": "type1", + "location": "loc1", + "labels": {"l1": "v1"}, + }, + {"name": "res2_safe", "assetType": "type2", "location": "loc2"}, + ] + + # Res1 is NOT safe (False), Res2 is safe (True) + mock_check.side_effect = [False, True] + + _print_resources(Path("fake.json"), config_mock) + + captured = capsys.readouterr() + assert "(res2_safe)" in captured.out + assert "Resource Count : 1" in captured.out + assert "Ignored Resource Count : 1" in captured.out + + +def test_handle_list(mock_args): + with ( + patch.object(cmd_list, "init_directories") as mock_init, + patch.object(cmd_list, "get_resources_file") as mock_get_file, + patch.object(cmd_list, "ensure_path") as mock_ensure, + patch.object(cmd_list, "_sync_resources") as mock_sync, + patch.object(GCPProjectConfig, "load_from_file") as mock_load_cfg, + patch.object(cmd_list, "_print_resources") as mock_print, + ): + # Customize mock_args for this case + mock_args.sync = True + mock_args.asset_type = "compute" + + mock_path = MagicMock(spec=Path) + mock_path.exists.return_value = False + mock_path.parent = "fake_parent" + mock_get_file.return_value = mock_path + + handle_list(None, mock_args) + + mock_init.assert_called_once() + mock_get_file.assert_called_once_with("test-proj", "compute", resource_file_path=None) + mock_ensure.assert_called_once_with("fake_parent") + mock_sync.assert_called_once_with("test-proj", mock_path, "compute") + mock_load_cfg.assert_called_once_with("test-proj", config_path=None) + mock_print.assert_called_once() diff --git a/providers/google/tests/system/google/resources_cleanup/tests/test_cmd_tree.py b/providers/google/tests/system/google/resources_cleanup/tests/test_cmd_tree.py new file mode 100644 index 0000000000000..141f535f3ca6a --- /dev/null +++ b/providers/google/tests/system/google/resources_cleanup/tests/test_cmd_tree.py @@ -0,0 +1,146 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from pathlib import Path +from unittest.mock import MagicMock, mock_open, patch + +from airflow_google_provider_resource_cleanup.commands.cmd_tree import ( + _build_resource_tree, + _format_node_for_template, + handler_tree, + print_tree, +) + + +def test_format_node_for_template(): + node_details = { + "data": { + "name": "my-instance", + "type": "compute.googleapis.com/Instance", + "id": "12345", + "location": "us-central1-a", + "is_protected": True, + }, + "children": { + "child1": { + "data": {"name": "disk-b", "type": "disk"}, + "children": {}, + }, + "child2": { + "data": {"name": "disk-a", "type": "disk"}, + "children": {}, + }, + }, + } + + formatted = _format_node_for_template(node_details) + + assert formatted["display_name"] == "my-instance" + assert formatted["asset_type_short"] == "Instance" + assert formatted["is_protected"] is True + assert len(formatted["children"]) == 2 + assert formatted["children"][0]["display_name"] == "disk-a" + assert formatted["children"][1]["display_name"] == "disk-b" + + +def test_build_resource_tree(): + with patch("airflow_google_provider_resource_cleanup.commands.cmd_tree.check_white_list") as mock_check: + mock_check.return_value = True + + resources_data = [ + { + "name": ("//compute.googleapis.com/projects/p1/zones/z1/instances/i1"), + "displayName": "instance-1", + "assetType": "compute.googleapis.com/Instance", + "parentFullResourceName": ("//cloudresourcemanager.googleapis.com/projects/p1"), + }, + { + "name": "//cloudresourcemanager.googleapis.com/projects/p1", + "displayName": "my-project", + "assetType": "cloudresourcemanager.googleapis.com/Project", + }, + ] + + cfg = MagicMock() + tree = _build_resource_tree(resources_data, cfg) + + assert len(tree) == 1 + assert tree[0]["display_name"] == "my-project" + assert len(tree[0]["children"]) == 1 + assert tree[0]["children"][0]["display_name"] == "instance-1" + + +def test_print_tree(capsys): + nodes_list = [ + { + "display_name": "Parent", + "asset_type_short": "Project", + "location": "global", + "is_protected": True, + "children": [ + { + "display_name": "Child", + "asset_type_short": "Instance", + "location": "us-central1", + "is_protected": False, + "children": [], + } + ], + } + ] + + print_tree(nodes_list) + + captured = capsys.readouterr() + assert "- Parent (Project @ global) (Protected)" in captured.out + assert "\t- Child (Instance @ us-central1)" in captured.out + + +def test_handler_tree(mock_args): + with ( + patch("airflow_google_provider_resource_cleanup.commands.cmd_tree.init_directories") as mock_init, + patch("airflow_google_provider_resource_cleanup.commands.cmd_tree.get_resources_file") as mock_get_file, + patch( + "airflow_google_provider_resource_cleanup.commands.cmd_tree.GCPProjectConfig.load_from_file" + ) as mock_load_cfg, + patch("airflow_google_provider_resource_cleanup.commands.cmd_tree.load_json") as mock_load_json, + patch("airflow_google_provider_resource_cleanup.commands.cmd_tree._build_resource_tree") as mock_build, + patch("airflow_google_provider_resource_cleanup.commands.cmd_tree._get_html_template") as mock_get_template, + patch("airflow_google_provider_resource_cleanup.commands.cmd_tree._get_html_file") as mock_get_file_path, + patch("airflow_google_provider_resource_cleanup.commands.cmd_tree.run_command") as mock_run, + ): + # Mock setup + mock_resource_file = MagicMock(spec=Path) + mock_resource_file.exists.return_value = True + mock_get_file.return_value = mock_resource_file + + mock_template = MagicMock() + mock_template.render.return_value = "Content" + mock_get_template.return_value = mock_template + + mock_html_file = MagicMock(spec=Path) + mock_get_file_path.return_value = mock_html_file + m_open = mock_open() + mock_html_file.open.return_value = m_open() + + handler_tree(None, mock_args) + + mock_init.assert_called_once() + mock_load_cfg.assert_called_once_with("test-proj", config_path=None) + mock_load_json.assert_called_once_with(mock_resource_file) + mock_build.assert_called_once() + mock_run.assert_called() diff --git a/providers/openlineage/src/airflow/providers/openlineage/hooks/__init__.py b/providers/openlineage/src/airflow/providers/openlineage/hooks/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/openlineage/src/airflow/providers/openlineage/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License.