Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions docs/BACKUP.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ python scripts/backup.py input.csv conf.yaml -p path/to/profiles.yaml --profile
## Configuration file (conf)
The configuration files let you define which type of storage the export tool will save the backups to, and any additional storage-specific information that might be required. Currently AWS S3 and Local storage are supported.

If you run the script with `list-of-parents` or `entire-organization`, the script will fetch the IDs of workspaces to process (either hierarchies under the specified parents or all the workspaces within the organization) in batches. As a default, the batch size is set to `100`, but you can parametrize it by setting the `api_page_size` parametter in your configuration yaml.
If you run the script with `list-of-parents` or `entire-organization`, the script will fetch the IDs of workspaces to process (either hierarchies under the specified parents or all the workspaces within the organization) in batches. As a default, the batch size is set to `100`, but you can parametrize it by setting the `api_page_size` parameter in your configuration yaml.

The `batch_size` is an optional parameter which accepts integer value and determines how many workspaces will be processed before saving the backups to the selected storage. As a default, the batch size is set to `100`. If you want to set a different batch size, you can specify so in the configuration yaml.

The configuration file has the following format:
```yaml
Expand All @@ -78,8 +80,10 @@ storage:
arg1: foo
arg2: bar
api_page_size: 1000
batch_size: 20
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20 or 100? 🤔

```


### AWS S3

You can define the configuration file for S3 storage like so:
Expand All @@ -96,7 +100,7 @@ Here, the meaning of different `storage` fields is as follows:
- backup_path - absolute path within the S3 bucket which leads to the root directory where the backups should be saved
- profile (optional) - AWS profile to be used

## Local Storage
### Local Storage

```yaml
storage_type: local
Expand Down
169 changes: 121 additions & 48 deletions scripts/backup.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,52 @@
# (C) 2025 GoodData Corporation
import abc
import argparse
import datetime
import json
import logging
import os
import shutil
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any, Type

import boto3 # type: ignore[import]
import requests
import yaml
from gooddata_api_client.exceptions import NotFoundException
from gooddata_sdk import __version__ as sdk_version # type: ignore[import]
from gooddata_sdk.sdk import GoodDataSdk # type: ignore[import]
from gooddata_sdk.sdk import GoodDataSdk
from utils.backup_utils.input_loader import InputLoader # type: ignore[import]
from utils.constants import ( # type: ignore[import]
BackupSettings,
DirNames,
GoodDataProfile,
)
from utils.gd_api import ( # type: ignore[import]
BEARER_TKN_PREFIX,
GDApi,
GoodDataRestApiError,
)
from utils.logger import setup_logging # type: ignore[import]
from utils.models.batch import BackupBatch, Size # type: ignore[import]

setup_logging()
logger = logging.getLogger("backup")

TIMESTAMP_SDK_FOLDER = (
str(datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
+ "-"
+ sdk_version.replace(".", "_")
)

PROFILES_FILE = "profiles.yaml"
PROFILES_DIRECTORY = ".gooddata"
PROFILES_FILE_PATH = Path.home() / PROFILES_DIRECTORY / PROFILES_FILE


LAYOUTS_DIR = "gooddata_layouts"
LDM_DIR = "ldm"

API_PAGE_SIZE = 100
module_name = __file__.split(os.sep)[-1]
logger = logging.getLogger(module_name)


# TODO: consider moving storage related logic to a separate module and reuse it in restore
class BackupRestoreConfig:
def __init__(self, conf_path: str):
with open(conf_path, "r") as stream:
conf = yaml.safe_load(stream)
self.storage_type = conf["storage_type"]
self.storage = conf["storage"]
self.api_page_size = conf.get("api_page_size", API_PAGE_SIZE)
conf: dict = yaml.safe_load(stream)

self.storage_type: str = conf["storage_type"]
self.storage: dict[str, str] = conf["storage"]

page_size = conf.get("api_page_size", BackupSettings.DEFAULT_PAGE_SIZE)
self.api_page_size: Size = Size(size=page_size)

batch_size = conf.get("batch_size", BackupSettings.DEFAULT_BATCH_SIZE)
self.batch_size: Size = Size(size=batch_size)


class BackupStorage(abc.ABC):
Expand All @@ -69,11 +64,13 @@ def __init__(self, conf: BackupRestoreConfig):
self._config = conf.storage
self._profile = self._config.get("profile", "default")
self._session = self._create_boto_session(self._profile)
self._api = self._session.resource("s3")
self._bucket = self._api.Bucket(self._config["bucket"]) # type: ignore [missing library stubs]
self._resource = self._session.resource("s3")
self._bucket = self._resource.Bucket(self._config["bucket"]) # type: ignore [missing library stubs]
suffix = "/" if not self._config["backup_path"].endswith("/") else ""
self._backup_path = self._config["backup_path"] + suffix

self._verify_connection()

@staticmethod
def _create_boto_session(profile: str) -> boto3.Session:
try:
Expand All @@ -85,6 +82,17 @@ def _create_boto_session(profile: str) -> boto3.Session:

return boto3.Session()

def _verify_connection(self) -> None:
"""
Pings the S3 bucket to verify that the connection is working.
"""
try:
self._resource.meta.client.head_bucket(Bucket=self._config["bucket"])
except Exception as e:
raise RuntimeError(
f"Failed to connect to S3 bucket {self._config['bucket']}: {e}"
)

def export(self, folder, org_id) -> None:
"""Uploads the content of the folder to S3 as backup."""
storage_path = self._config["bucket"] + "/" + self._backup_path
Expand Down Expand Up @@ -149,9 +157,9 @@ def create_parser() -> argparse.ArgumentParser:
"-p",
"--profile-config",
type=Path,
default=PROFILES_FILE_PATH,
default=GoodDataProfile.PROFILE_PATH,
help="Optional path to GoodData profile config. "
f'If no path is provided, "{PROFILES_FILE_PATH}" is used.',
f'If no path is provided, "{GoodDataProfile.PROFILE_PATH}" is used.',
)
parser.add_argument(
"--profile",
Expand Down Expand Up @@ -239,7 +247,7 @@ def get_automations_from_api(api: GDApi, ws_id: str) -> Any:
response: requests.Response = requests.get(
f"{api.endpoint}/entities/workspaces/{ws_id}/automations?include=ALL",
headers={
"Authorization": f"{BEARER_TKN_PREFIX} {api.api_token}",
"Authorization": f"Bearer {api.api_token}",
"Content-Type": "application/vnd.gooddata.api+json",
},
)
Expand Down Expand Up @@ -301,7 +309,9 @@ def get_workspace_export(
"""
exported = False
for ws_id in workspaces_to_export:
export_path = Path(local_target_path, org_id, ws_id, TIMESTAMP_SDK_FOLDER)
export_path = Path(
local_target_path, org_id, ws_id, BackupSettings.TIMESTAMP_SDK_FOLDER
)

user_data_filters = get_user_data_filters(api, ws_id)
if not user_data_filters:
Expand All @@ -316,8 +326,8 @@ def get_workspace_export(
store_user_data_filters(user_data_filters, export_path, org_id, ws_id)
logger.info(f"Stored export for {ws_id}")
exported = True
except NotFoundException:
logger.error(f"Workspace {ws_id} does not exist. Skipping.")
except Exception as e:
logger.error(f"Skipping {ws_id}. Error encountered: {e}")

if not exported:
raise RuntimeError(
Expand All @@ -329,9 +339,9 @@ def archive_gooddata_layouts_to_zip(folder: str) -> None:
"""Archives the gooddata_layouts directory to a zip file."""
target_subdir = ""
for subdir, dirs, files in os.walk(folder):
if LAYOUTS_DIR in dirs:
if DirNames.LAYOUTS in dirs:
target_subdir = os.path.join(subdir, dirs[0])
if LDM_DIR in dirs:
if DirNames.LDM in dirs:
inner_layouts_dir = subdir + "/gooddata_layouts"
os.mkdir(inner_layouts_dir)
for dir in dirs:
Expand Down Expand Up @@ -382,6 +392,75 @@ def validate_args(args: argparse.Namespace) -> None:
)


def split_to_batches(
workspaces_to_export: list[str], batch_size: Size
) -> list[BackupBatch]:
"""Splits the list of workspaces to into batches of the specified size.
The batch is respresented as a list of workspace IDs.
Returns a list of batches (i.e. list of lists of IDs)
"""
list_of_batches = []
while workspaces_to_export:
batch = BackupBatch(workspaces_to_export[: batch_size.size])
workspaces_to_export = workspaces_to_export[batch_size.size :]
list_of_batches.append(batch)

return list_of_batches


def process_batch(
sdk: GoodDataSdk,
api: GDApi,
org_id: str,
storage: BackupStorage,
batch: BackupBatch,
retry_count: int = 0,
) -> None:
"""Processes a single batch of workspaces for backup.
If the batch processing fails, the function will wait
and retry with exponential backoff up to BackupSettings.MAX_RETRIES.
The base wait time is defined by BackupSettings.RETRY_DELAY.
"""
try:
with tempfile.TemporaryDirectory() as tmpdir:
get_workspace_export(sdk, api, tmpdir, org_id, batch.list_of_ids)

archive_gooddata_layouts_to_zip(str(Path(tmpdir, org_id)))

storage.export(tmpdir, org_id)

except Exception as e:
# Retry with exponential backoff until MAX_RETRIES, then raise the error
if retry_count < BackupSettings.MAX_RETRIES:
next_retry = retry_count + 1
logger.info(
f"Unexpected error while processing a batch. Retrying {next_retry}/{BackupSettings.MAX_RETRIES}..."
)
time.sleep(BackupSettings.RETRY_DELAY**next_retry)
process_batch(sdk, api, org_id, storage, batch, next_retry)
else:
logger.error(f"Error processing batch: {e}")
raise e


def process_batches_in_parallel(
sdk: GoodDataSdk,
api: GDApi,
org_id: str,
storage: BackupStorage,
batches: list[BackupBatch],
) -> None:
with ThreadPoolExecutor(max_workers=BackupSettings.MAX_WORKERS) as executor:
futures = []
for batch in batches:
futures.append(
executor.submit(process_batch, sdk, api, org_id, storage, batch)
)

for future in futures:
future.result()


def main(args: argparse.Namespace) -> None:
"""Main function for the backup script."""
sdk, api = create_client(args)
Expand All @@ -393,24 +472,18 @@ def main(args: argparse.Namespace) -> None:
storage_class: Type[BackupStorage] = get_storage(conf.storage_type)
storage: BackupStorage = storage_class(conf)

# TODO: if storage set to S3, check that valid connection can be established
# currently the script would gather the exports and only then fail to upload them

loader = InputLoader(api, conf.api_page_size)
workspaces_to_export: list[str] = loader.get_ids_to_backup(
args.input_type, args.ws_csv
)

if not workspaces_to_export:
logger.error("No workspaces to export. Check the input file or the input type.")
return
batches = split_to_batches(workspaces_to_export, conf.batch_size)

with tempfile.TemporaryDirectory() as tmpdir:
get_workspace_export(sdk, api, tmpdir, org_id, workspaces_to_export)

archive_gooddata_layouts_to_zip(str(Path(tmpdir, org_id)))
logger.info(
f"Exporting {len(workspaces_to_export)} workspaces in {len(batches)} batches."
)

storage.export(tmpdir, org_id)
process_batches_in_parallel(sdk, api, org_id, storage, batches)


if __name__ == "__main__":
Expand All @@ -421,6 +494,6 @@ def main(args: argparse.Namespace) -> None:
validate_args(args)
main(args)

logger.info("Backup completed.")
logger.info("Backup completed!")
except Exception as e:
logger.error(f"Backup failed: {e}")
Loading