-
Notifications
You must be signed in to change notification settings - Fork 3
Ar migrate storage builds drafts #2175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
388aa4c
40d5791
2c669ec
b1ae54f
ebf3161
3fe4439
3815ca2
296a88f
59cdffa
4e8b20e
74bd2a0
8258bb3
70ed00f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,196 @@ | ||
| from dataclasses import asdict | ||
| from datetime import datetime | ||
| from pathlib import Path | ||
|
|
||
| import pytz | ||
|
|
||
| from dcpy.configuration import ( | ||
| BUILD_NAME, | ||
| CI, | ||
| PUBLISHING_BUCKET, | ||
| PUBLISHING_BUCKET_ROOT_FOLDER, | ||
| ) | ||
| from dcpy.connectors.hybrid_pathed_storage import PathedStorageConnector, StorageType | ||
| from dcpy.connectors.registry import VersionedConnector | ||
| from dcpy.models.connectors.edm.publishing import ( | ||
| BuildKey, | ||
| ) | ||
| from dcpy.utils import git, s3 | ||
| from dcpy.utils.logging import logger | ||
|
|
||
|
|
||
| def _bucket() -> str: | ||
| assert PUBLISHING_BUCKET, ( | ||
| "'PUBLISHING_BUCKET' must be defined to use edm.recipes connector" | ||
| ) | ||
| return PUBLISHING_BUCKET | ||
|
|
||
|
|
||
| _TEMP_PUBLISHING_FILE_SUFFIXES = { | ||
| ".zip", | ||
| ".parquet", | ||
| ".csv", | ||
| ".pdf", | ||
| ".xlsx", | ||
| ".json", | ||
| ".text", | ||
| } | ||
|
|
||
|
|
||
| def get_builds(product: str) -> list[str]: | ||
| """Get all build versions for a product.""" | ||
| return sorted(s3.get_subfolders(_bucket(), f"{product}/build/"), reverse=True) | ||
|
|
||
|
|
||
| class BuildsConnector(VersionedConnector, arbitrary_types_allowed=True): | ||
| conn_type: str = "edm.publishing.builds" | ||
| _storage: PathedStorageConnector | None = None | ||
|
|
||
| def __init__(self, storage: PathedStorageConnector | None = None, **kwargs): | ||
| """Initialize BuildsConnector with optional storage.""" | ||
| super().__init__(**kwargs) | ||
| if storage is not None: | ||
| self._storage = storage | ||
|
|
||
| @property | ||
| def storage(self) -> PathedStorageConnector: | ||
| """Lazy-loaded storage connector. Only initializes when first accessed.""" | ||
| if self._storage is None: | ||
| self._storage = PathedStorageConnector.from_storage_kwargs( | ||
| conn_type="edm.publishing.builds", | ||
| storage_backend=StorageType.S3, | ||
| s3_bucket=_bucket(), | ||
| root_folder=PUBLISHING_BUCKET_ROOT_FOLDER, | ||
| _validate_root_path=False, | ||
| ) | ||
| return self._storage | ||
|
|
||
| @staticmethod | ||
| def create() -> "BuildsConnector": | ||
| """Create a BuildsConnector with lazy-loaded S3 storage.""" | ||
| return BuildsConnector() | ||
|
|
||
| def _generate_metadata(self) -> dict[str, str]: | ||
| """Generates "standard" s3 metadata for our files""" | ||
| metadata = { | ||
| "date-created": datetime.now(pytz.timezone("America/New_York")).isoformat() | ||
| } | ||
| metadata["commit"] = git.commit_hash() | ||
| if CI: | ||
| metadata["run-url"] = git.action_url() | ||
| return metadata | ||
|
|
||
| def _upload_build( | ||
| self, | ||
| build_dir: Path, | ||
| product: str, | ||
| *, | ||
| acl: s3.ACL | None = None, | ||
| build_name: str | None = None, | ||
| # max_files: int = s3.MAX_FILE_COUNT, # TODO | ||
| ) -> BuildKey: | ||
| """ | ||
| Uploads a product build to an S3 bucket using cloudpathlib. | ||
|
|
||
| This function handles uploading a local output folder to a specified | ||
| location in an S3 bucket. The path, product, and build name must be | ||
| provided, along with an optional ACL (Access Control List) to control | ||
| file access in S3. | ||
|
|
||
| Raises: | ||
| FileNotFoundError: If the provided output_path does not exist. | ||
| ValueError: If the build name is not provided and cannot be found in the environment variables. | ||
| """ | ||
| if not build_dir.exists(): | ||
| raise FileNotFoundError(f"Path {build_dir} does not exist") | ||
| build_name = build_name or BUILD_NAME | ||
| if not build_name: | ||
| raise ValueError( | ||
| f"Build name supplied via CLI or the env var 'BUILD_NAME' cannot be '{build_name}'." | ||
| ) | ||
| build_key = BuildKey(product, build_name) | ||
|
|
||
| logger.info(f'Uploading {build_dir} to {build_key.path} with ACL "{acl}"') | ||
| self.storage.push( | ||
| key=build_key.path, | ||
| filepath=str(build_dir), | ||
| acl=str(acl), | ||
| metadata=self._generate_metadata(), | ||
| ) | ||
|
|
||
| return build_key | ||
|
|
||
| def push_versioned(self, key: str, version: str, **kwargs) -> dict: | ||
| # For builds, the "version" is the build name/ID | ||
| connector_args = kwargs["connector_args"] | ||
| acl = ( | ||
| s3.string_as_acl(connector_args["acl"]) | ||
| if connector_args.get("acl") | ||
| else None | ||
| ) | ||
|
|
||
| logger.info(f"Pushing build for product: {key}, build: {version}") | ||
| result = self._upload_build( | ||
| build_dir=kwargs["build_path"], | ||
| product=key, | ||
| acl=acl, | ||
| build_name=version, | ||
| ) | ||
| return asdict(result) | ||
|
|
||
| def _pull( | ||
| self, | ||
| key: str, | ||
| version: str, | ||
| destination_path: Path, | ||
| *, | ||
| filepath: str = "", | ||
| **kwargs, | ||
| ) -> dict: | ||
| build_key = BuildKey(key, version) | ||
|
|
||
| source_key = f"{build_key.path}/{filepath}" | ||
|
|
||
| if not self.storage.exists(source_key): | ||
| raise FileNotFoundError(f"File {source_key} not found") | ||
|
|
||
| # Determine output path | ||
| is_file_path = destination_path.suffix in _TEMP_PUBLISHING_FILE_SUFFIXES | ||
| output_filepath = ( | ||
| destination_path / Path(filepath).name | ||
| if not is_file_path | ||
| else destination_path | ||
| ) | ||
|
|
||
| logger.info( | ||
| f"Downloading {build_key}, {filepath}, {source_key} -> {output_filepath}" | ||
| ) | ||
|
|
||
| self.storage.pull(key=source_key, destination_path=output_filepath) | ||
| return {"path": output_filepath} | ||
|
|
||
| def pull_versioned( | ||
| self, key: str, version: str, destination_path: Path, **kwargs | ||
| ) -> dict: | ||
| return self._pull(key, version, destination_path, **kwargs) | ||
|
|
||
| def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]: | ||
| """List all build versions (build names) for a product.""" | ||
| build_folder_key = f"{key}/build" | ||
| if not self.storage.exists(build_folder_key): | ||
| return [] | ||
|
|
||
| return sorted(self.storage.get_subfolders(build_folder_key), reverse=sort_desc) | ||
|
|
||
| def get_latest_version(self, key: str, **kwargs) -> str: | ||
| """Builds don't have a meaningful 'latest' version concept.""" | ||
| raise NotImplementedError( | ||
| "Builds don't have a meaningful 'latest' version. Use list_versions() to see available builds." | ||
| ) | ||
|
|
||
| def version_exists(self, key: str, version: str, **kwargs) -> bool: | ||
| """Check if a specific build exists.""" | ||
| return version in self.list_versions(key) | ||
|
|
||
| def data_local_sub_path(self, key: str, *, version: str, **kwargs) -> Path: | ||
| return Path("edm") / "builds" / "datasets" / key / version | ||
|
Comment on lines
+195
to
+196
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bit of a nit, but this would presumably be so someone else (i.e. the lifecycle code pulling this) knows where to put it right? Can we not do "datasets" then? builds/datasets makes me think of what's going into a build. Even if it's "data_products". Or would it just make sense to have this match what's in DO? as in, key / builds / version?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need to review how this even works, honestly... The |
||
Uh oh!
There was an error while loading. Please reload this page.