From 388aa4c17ab1f5661127017b8979b3568435cebc Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Sat, 17 Jan 2026 17:47:19 -0500 Subject: [PATCH 01/13] Comment out bytes test for now This was taking a long time. I think there's absolutely utility in running a scraper against BYTES every week or so, but this is just too much --- dcpy/test_integration/connectors/edm/test_bytes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dcpy/test_integration/connectors/edm/test_bytes.py b/dcpy/test_integration/connectors/edm/test_bytes.py index 823d2e2d64..8f1c307dfc 100644 --- a/dcpy/test_integration/connectors/edm/test_bytes.py +++ b/dcpy/test_integration/connectors/edm/test_bytes.py @@ -4,7 +4,7 @@ from dcpy.utils.logging import logger -def test_bytes_versions_are_retrieved(): +def _test_bytes_versions_are_retrieved(): versions = BytesConnector().fetch_all_latest_versions_df() with_errors = versions.loc[versions["version_fetch_error"].astype(bool)] if not with_errors.empty: From 40d5791003602f21bc7b92461c086ec42d797d00 Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Tue, 20 Jan 2026 12:42:16 -0500 Subject: [PATCH 02/13] Update sample .env Having to re-populate my .env after an *unfortunate* incident (see two commits ahead) prompted this. It's a little out of date. --- example.env | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/example.env b/example.env index b6a7ca57fb..ac6f826d63 100644 --- a/example.env +++ b/example.env @@ -1,6 +1,10 @@ BUILD_ENGINE=postgresql://postgres:postgres@localhost:5432/postgres -AWS_S3_ENDPOINT= +AWS_S3_ENDPOINT=https://nyc3.digitaloceanspaces.com AWS_SECRET_ACCESS_KEY= AWS_ACCESS_KEY_ID= -RECIPES_BUCKET= -RECIPE_ENGINE= \ No newline at end of file +RECIPES_BUCKET=edm-recipes +RECIPE_ENGINE= +PUBLISHING_BUCKET=edm-publishing +PUBLISHING_BUCKET_ROOT_FOLDER= +PRODUCT_METADATA_REPO_PATH=../product-metadata/ +TEMPLATE_DIR=./ingest_templates From 2c669ec9cb645114e74c82b44ba5fa6e526f4534 Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Fri, 16 Jan 2026 13:10:03 -0500 Subject: [PATCH 03/13] Add pub root folder to config --- dcpy/configuration.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dcpy/configuration.py b/dcpy/configuration.py index d31d5cb43f..679d313715 100644 --- a/dcpy/configuration.py +++ b/dcpy/configuration.py @@ -16,6 +16,7 @@ DEFAULT_S3_URL = "https://nyc3.digitaloceanspaces.com" PUBLISHING_BUCKET = env.get("PUBLISHING_BUCKET") +PUBLISHING_BUCKET_ROOT_FOLDER: str = env.get("PUBLISHING_BUCKET_ROOT_FOLDER", "") LOGGING_DB = "edm-qaqc" LOGGING_SCHEMA = "product_data" From b1ae54ff5a627e979607fbdfa2d1a91756e9c459 Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Tue, 20 Jan 2026 12:42:03 -0500 Subject: [PATCH 04/13] DANGER ZONE! remove rmtree from hps This was... maybe the worst two lines of code I have every written. If you used this connector to attempt to download a file to, say, your data-engineering/ directory, it would wipe the directory --- dcpy/connectors/hybrid_pathed_storage.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/dcpy/connectors/hybrid_pathed_storage.py b/dcpy/connectors/hybrid_pathed_storage.py index 682723b101..731bc53e0d 100644 --- a/dcpy/connectors/hybrid_pathed_storage.py +++ b/dcpy/connectors/hybrid_pathed_storage.py @@ -282,9 +282,6 @@ def pull(self, key: str, destination_path: Path, **kwargs) -> dict: if not src_path.exists(): raise FileNotFoundError(f"Source path {src_path} does not exist") if src_path.is_dir(): - # Recursively copy directory - if destination_path.exists(): - shutil.rmtree(destination_path) src_path.copytree(destination_path) else: destination_path.parent.mkdir(parents=True, exist_ok=True) From ebf31619e32e118acbf6eb3d0aadc5d585528fa4 Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Fri, 16 Jan 2026 13:15:59 -0500 Subject: [PATCH 05/13] Configure the connectors in lifecycle builds --- dcpy/lifecycle/builds/connector.py | 20 +++++++++++++++++++- dcpy/lifecycle/config.py | 3 +++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/dcpy/lifecycle/builds/connector.py b/dcpy/lifecycle/builds/connector.py index 5074e3539a..0888dd942d 100644 --- a/dcpy/lifecycle/builds/connector.py +++ b/dcpy/lifecycle/builds/connector.py @@ -1,5 +1,5 @@ -from dcpy.lifecycle.connector_registry import connectors from dcpy.lifecycle import config +from dcpy.lifecycle.connector_registry import connectors LIFECYCLE_STAGE = "builds" @@ -10,3 +10,21 @@ def get_recipes_default_connector(): return connectors.versioned[ config.stage_config(LIFECYCLE_STAGE)["default_recipes_connector"] ] + + +def get_builds_default_connector(): + return connectors.versioned[ + config.stage_config(LIFECYCLE_STAGE)["default_builds_connector"] + ] + + +def get_drafts_default_connector(): + return connectors.versioned[ + config.stage_config(LIFECYCLE_STAGE)["default_drafts_connector"] + ] + + +def get_published_default_connector(): + return connectors.versioned[ + config.stage_config(LIFECYCLE_STAGE)["default_published_connector"] + ] diff --git a/dcpy/lifecycle/config.py b/dcpy/lifecycle/config.py index 5c0f14e4bc..75bca6f67a 100644 --- a/dcpy/lifecycle/config.py +++ b/dcpy/lifecycle/config.py @@ -23,6 +23,9 @@ def _set_default_conf(): }, "builds": { "default_recipes_connector": "edm.recipes.datasets", + "default_builds_connector": "edm.publishing.builds", + "default_drafts_connector": "edm.publishing.drafts", + "default_published_connector": "edm.publishing.published", "local_data_path": "builds", "stages": { "plan": {"local_data_path": "plan"}, From 3fe443914c84a476825e871f37241e508eaf48ce Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Fri, 16 Jan 2026 13:07:33 -0500 Subject: [PATCH 06/13] Add builds conn --- dcpy/connectors/edm/builds.py | 199 ++++++++++++++++ dcpy/lifecycle/builds/artifacts/builds.py | 215 ++++++++++++++++++ .../connectors/edm/test_builds_conn.py | 162 +++++++++++++ 3 files changed, 576 insertions(+) create mode 100644 dcpy/connectors/edm/builds.py create mode 100644 dcpy/lifecycle/builds/artifacts/builds.py create mode 100644 dcpy/test_integration/connectors/edm/test_builds_conn.py diff --git a/dcpy/connectors/edm/builds.py b/dcpy/connectors/edm/builds.py new file mode 100644 index 0000000000..707780dba2 --- /dev/null +++ b/dcpy/connectors/edm/builds.py @@ -0,0 +1,199 @@ +from dataclasses import asdict +from datetime import datetime +from pathlib import Path + +import pytz + +from dcpy.configuration import ( + BUILD_NAME, + CI, + PUBLISHING_BUCKET, + PUBLISHING_BUCKET_ROOT_FOLDER, +) +from dcpy.connectors.hybrid_pathed_storage import PathedStorageConnector, StorageType +from dcpy.connectors.registry import VersionedConnector +from dcpy.models.connectors.edm.publishing import ( + BuildKey, +) +from dcpy.utils import git, s3 +from dcpy.utils.logging import logger + + +def _bucket() -> str: + assert PUBLISHING_BUCKET, ( + "'PUBLISHING_BUCKET' must be defined to use edm.recipes connector" + ) + return PUBLISHING_BUCKET + + +_TEMP_PUBLISHING_FILE_SUFFIXES = { + ".zip", + ".parquet", + ".csv", + ".pdf", + ".xlsx", + ".json", + ".text", +} + + +def get_builds(product: str) -> list[str]: + """Get all build versions for a product.""" + return sorted(s3.get_subfolders(_bucket(), f"{product}/build/"), reverse=True) + + +class BuildsConnector(VersionedConnector, arbitrary_types_allowed=True): + conn_type: str = "edm.publishing.builds" + _storage: PathedStorageConnector | None = None + + def __init__(self, storage: PathedStorageConnector | None = None, **kwargs): + """Initialize BuildsConnector with optional storage.""" + super().__init__(**kwargs) + if storage is not None: + self._storage = storage + + @property + def storage(self) -> PathedStorageConnector: + """Lazy-loaded storage connector. Only initializes when first accessed.""" + if self._storage is None: + self._storage = PathedStorageConnector.from_storage_kwargs( + conn_type="edm.publishing.builds", + storage_backend=StorageType.S3, + s3_bucket=_bucket(), + root_folder=PUBLISHING_BUCKET_ROOT_FOLDER, + _validate_root_path=False, + ) + return self._storage + + @staticmethod + def create() -> "BuildsConnector": + """Create a BuildsConnector with lazy-loaded S3 storage.""" + return BuildsConnector() + + def _generate_metadata(self) -> dict[str, str]: + """Generates "standard" s3 metadata for our files""" + metadata = { + "date-created": datetime.now(pytz.timezone("America/New_York")).isoformat() + } + metadata["commit"] = git.commit_hash() + if CI: + metadata["run-url"] = git.action_url() + return metadata + + def _upload_build( + self, + build_dir: Path, + product: str, + *, + acl: s3.ACL | None = None, + build_name: str | None = None, + # max_files: int = s3.MAX_FILE_COUNT, # TODO + ) -> BuildKey: + """ + Uploads a product build to an S3 bucket using cloudpathlib. + + This function handles uploading a local output folder to a specified + location in an S3 bucket. The path, product, and build name must be + provided, along with an optional ACL (Access Control List) to control + file access in S3. + + Raises: + FileNotFoundError: If the provided output_path does not exist. + ValueError: If the build name is not provided and cannot be found in the environment variables. + """ + if not build_dir.exists(): + raise FileNotFoundError(f"Path {build_dir} does not exist") + build_name = build_name or BUILD_NAME + if not build_name: + raise ValueError( + f"Build name supplied via CLI or the env var 'BUILD_NAME' cannot be '{build_name}'." + ) + build_key = BuildKey(product, build_name) + + logger.info(f'Uploading {build_dir} to {build_key.path} with ACL "{acl}"') + self.storage.push( + key=build_key.path, + filepath=str(build_dir), + acl=str(acl), + metadata=self._generate_metadata(), + ) + + return build_key + + def push_versioned(self, key: str, version: str, **kwargs) -> dict: + # For builds, the "version" is the build name/ID + connector_args = kwargs["connector_args"] + acl = ( + s3.string_as_acl(connector_args["acl"]) + if connector_args.get("acl") + else None + ) + + logger.info(f"Pushing build for product: {key}, build: {version}") + result = self._upload_build( + build_dir=kwargs["build_path"], + product=key, + acl=acl, + build_name=version, + ) + return asdict(result) + + def _pull( + self, + key: str, + version: str, + destination_path: Path, + *, + filepath: str = "", + **kwargs, + ) -> dict: + build_key = BuildKey(key, version) + + # Construct the source key for the file + source_key = f"{build_key.path}/{filepath}" + + # Check if the file exists + if not self.storage.exists(source_key): + raise FileNotFoundError(f"File {source_key} not found") + + # Determine output path + is_file_path = destination_path.suffix in _TEMP_PUBLISHING_FILE_SUFFIXES + output_filepath = ( + destination_path / Path(filepath).name + if not is_file_path + else destination_path + ) + + logger.info( + f"Downloading {build_key}, {filepath}, {source_key} -> {output_filepath}" + ) + + # Use PathedStorageConnector's pull method + self.storage.pull(key=source_key, destination_path=output_filepath) + return {"path": output_filepath} + + def pull_versioned( + self, key: str, version: str, destination_path: Path, **kwargs + ) -> dict: + return self._pull(key, version, destination_path, **kwargs) + + def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]: + """List all build versions (build names) for a product.""" + build_folder_key = f"{key}/build" + if not self.storage.exists(build_folder_key): + return [] + + return sorted(self.storage.get_subfolders(build_folder_key), reverse=sort_desc) + + def get_latest_version(self, key: str, **kwargs) -> str: + """Builds don't have a meaningful 'latest' version concept.""" + raise NotImplementedError( + "Builds don't have a meaningful 'latest' version. Use list_versions() to see available builds." + ) + + def version_exists(self, key: str, version: str, **kwargs) -> bool: + """Check if a specific build exists.""" + return version in self.list_versions(key) + + def data_local_sub_path(self, key: str, *, version: str, **kwargs) -> Path: + return Path("edm") / "builds" / "datasets" / key / version diff --git a/dcpy/lifecycle/builds/artifacts/builds.py b/dcpy/lifecycle/builds/artifacts/builds.py new file mode 100644 index 0000000000..be958aa062 --- /dev/null +++ b/dcpy/lifecycle/builds/artifacts/builds.py @@ -0,0 +1,215 @@ +import json +from datetime import datetime +from pathlib import Path +from tempfile import TemporaryDirectory + +import pytz +import yaml + +from dcpy.configuration import BUILD_NAME, CI +from dcpy.lifecycle.builds.artifacts import drafts +from dcpy.lifecycle.builds.connector import ( + get_builds_default_connector, + get_drafts_default_connector, +) +from dcpy.models.connectors.edm.publishing import BuildKey, DraftKey +from dcpy.models.lifecycle.builds import BuildMetadata +from dcpy.utils import git, versions +from dcpy.utils.logging import logger + + +def _generate_metadata(running_in_gha: bool = False) -> dict[str, str]: + """Generates "standard" s3 metadata for our files""" + metadata = { + "date-created": datetime.now(pytz.timezone("America/New_York")).isoformat() + } + metadata["commit"] = git.commit_hash() + if running_in_gha: + metadata["run-url"] = git.action_url() + return metadata + + +def build_exists(product: str, build: str): + return build in get_builds_default_connector().list_versions( + product, sort_desc=True + ) + + +def get_build_metadata(product: str, build: str) -> BuildMetadata: + """Retrieve build metadata using builds connector.""" + builds_conn = get_builds_default_connector() + assert build_exists(product, build), ( + f"No build exists for product={product}, build={build}" + ) + with TemporaryDirectory() as temp_dir: + result = builds_conn.pull_versioned( + key=product, + version=build, + filepath="build_metadata.json", + destination_path=Path(temp_dir), + ) + metadata_path = result.get("path") + if not metadata_path or not Path(metadata_path).exists(): + raise FileNotFoundError(f"Build metadata not found for {product}/{build}") + + return BuildMetadata(**yaml.safe_load(open(metadata_path).read())) + + +def get_version(product: str, build: str) -> str: + """Get version from build metadata.""" + return get_build_metadata(product, build).version + + +def _copy_to_draft( + product: str, build: str, draft_version: str, acl: str = "public-read" +) -> None: + """Copy all files from build to draft using connectors. + + Note: draft_version is in the form of {dataset version}.{revision num and label} + e.g. 24v1.1-my-build, where 24v1 is dataset version, and 1-my-build is the + revision and label. + """ + with TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Pull entire build folder to temp + get_builds_default_connector().pull_versioned( + key=product, + version=build, + destination_path=temp_path, + connector_args={"build_note": build}, + ) + + # Push entire folder to draft + get_drafts_default_connector().push_versioned( + key=product, + version=draft_version, + source_path=temp_path, + acl=acl, + ) + + logger.info( + f"Copied build {product}/{build} to draft {product}/{draft_version}" + ) + + +def upload( + output_path: Path, + product: str, + build: str | None = None, + acl: str = "", # TODO: keep in mind when you swap the references in gha +) -> BuildKey: + """ + Uploads a product build using the configured builds connector. + + Args: + output_path: Path to local output folder to upload + product: Name of data product + build: Label of build (defaults to BUILD_NAME env var) + acl: Access control level for uploaded files + + Returns: + BuildKey: The key for the uploaded build + + Raises: + FileNotFoundError: If the provided output_path does not exist. + ValueError: If the build name is not provided and cannot be found in environment variables. + """ + if not output_path.is_dir(): + raise FileNotFoundError(f"Path {output_path} should exist and be a dir") + + build_name = build or BUILD_NAME + if not build_name: + raise ValueError( + f"Build name supplied via CLI or the env var 'BUILD_NAME' cannot be '{build_name}'." + ) + + build_key = BuildKey(product, build_name) + meta = _generate_metadata(running_in_gha=CI) # for now, same thing. + + logger.info(f"Uploading {output_path} to {build_key.path}") + result = get_builds_default_connector().push_versioned( + key=product, + version=build_name, + build_path=output_path, + connector_args={"build_note": build_name, "acl": acl, "metadata": meta}, + ) + + logger.info(f"Successfully uploaded build: {result}") + return build_key + + +def promote_to_draft( + product: str, + build: str, + acl: str = "", + # keep_build: bool = True, # TODO: implement delete + draft_revision_summary: str = "", + metadata_file_dir: Path | None = None, +) -> DraftKey: + """ + Promotes a product build to draft using the configured connectors. + + Args: + product: Name of data product + build: Label of build to promote + acl: Access control level for uploaded files + keep_build: Whether to keep the original build after promotion + draft_revision_summary: Summary description for the draft revision + metadata_file_dir: Optional dir to save metadata file after promotion + + Returns: + DraftKey: The key for the created draft + + Raises: + FileNotFoundError: If the build does not exist. + ValueError: If required metadata is missing. + """ + logger.info(f'Promoting {product} {build} to draft with ACL "{acl}"') + + # Figure out version stuff + version = get_version(product, build) + draft_revision_number = ( + len(drafts.get_dataset_version_revisions(product, version)) + 1 + ) + draft_revision_label = versions.DraftVersionRevision( + draft_revision_number, draft_revision_summary + ).label + + # promote from build to draft + draft_key = DraftKey(product, version, draft_revision_label) + draft_version_revision = f"{version}.{draft_revision_label}" + _copy_to_draft(product, build, draft_version_revision, acl) + + # Push updated metadata file to draft + # TODO: We could push this file with the initial go above, + # but we'll probably implement an s3/azure copy, and short-circuit + # the download step above + with TemporaryDirectory() as tmpdirname: + build_metadata = get_build_metadata(product, build) + build_metadata.draft_revision_name = draft_revision_label + md_file_path = (metadata_file_dir or Path(tmpdirname)) / "build_metadata.json" + + md_file_path.write_text( + json.dumps(build_metadata.model_dump(mode="json"), indent=4) + ) + get_drafts_default_connector().push_versioned( + key=product, + version=draft_version_revision, + source_path=md_file_path, + target_path="build_metadata.json", + acl=acl, + ) + logger.info( + f"Updated metadata for {product}/{draft_version_revision} with draft revision {draft_revision_label}" + ) + + logger.info(f"Promoted {product} to drafts as {version}/{draft_revision_label}") + + # TODO: Implement build deletion via connector + # if not keep_build: + # logger.info( + # f"Would delete build {product}/{build} (deletion not implemented yet)" + # ) + + return draft_key diff --git a/dcpy/test_integration/connectors/edm/test_builds_conn.py b/dcpy/test_integration/connectors/edm/test_builds_conn.py new file mode 100644 index 0000000000..5e8919f835 --- /dev/null +++ b/dcpy/test_integration/connectors/edm/test_builds_conn.py @@ -0,0 +1,162 @@ +import time +import uuid +from pathlib import Path + +import pytest + +from dcpy.connectors.edm.builds import BuildsConnector +from dcpy.connectors.hybrid_pathed_storage import ( + PathedStorageConnector, + StorageType, +) + + +def calculate_test_path(): + return ( + Path("integration_tests") + / "edm" + / "connectors" + / "builds" + / f"{int(time.time())}_{str(uuid.uuid4())[:8]}" + ) + + +def _make_fake_builds(): + """Create test product and build combinations.""" + products = ["product1", "product2"] + builds = ["ar-distribution", "dm-dbt", "fvk-cscl"] + combos = [] + for product in products: + for build in builds: + combos.append((product, build)) + return combos + + +def _create_build_files(conn: BuildsConnector, tmp_path: Path): + """Create fake build files in the storage.""" + # Create temporary files locally then push them to storage + local_staging = tmp_path / "staging" + local_staging.mkdir(parents=True, exist_ok=True) + + for product, build in _make_fake_builds(): + # Create local build folder with some files + build_folder = local_staging / f"{product}_{build}" + build_folder.mkdir(parents=True, exist_ok=True) + + # Create some mock build artifacts + (build_folder / "build.zip").write_text( + f"Mock build data for {product} build {build}" + ) + (build_folder / "metadata.json").write_text( + f'{{"product": "{product}", "build": "{build}"}}' + ) + + # Push to storage using the connector's interface + # For builds, the structure is: {product}/build/{build}/ + storage_key = f"{product}/build/{build}" + conn.storage.push(key=storage_key, filepath=str(build_folder)) + + +@pytest.fixture +def local_builds_conn(tmp_path): + """Create a Builds connector with local storage for testing.""" + # Create the root directory first + root_dir = tmp_path / "builds" + root_dir.mkdir(parents=True, exist_ok=True) + + # Create connector with local storage instead of S3 + storage = PathedStorageConnector.from_storage_kwargs( + conn_type="edm.publishing.builds.test", + storage_backend=StorageType.LOCAL, + local_dir=root_dir, + ) + conn = BuildsConnector(storage=storage) + _create_build_files(conn, tmp_path) + return conn + + +def _test_builds_connector_interface(conn: BuildsConnector, tmp_path): + """Test the basic builds connector interface methods.""" + expected_builds = _make_fake_builds() + # e.g. [('product1', 'ar-distribution'), ('product1', 'dm-dbt'), ...] + + expected_products = {product for product, _ in expected_builds} + expected_builds_by_product: dict[str, list[str]] = {} + for product, build in expected_builds: + if product not in expected_builds_by_product: + expected_builds_by_product[product] = [] + expected_builds_by_product[product].append(build) + + # Check that all builds exist + for product, build in expected_builds: + assert conn.version_exists(product, version=build), ( + f"Build {build} should exist for product {product}" + ) + + # Test listing versions (builds) + for product in expected_products: + builds = conn.list_versions(product) + expected_builds_list = sorted(expected_builds_by_product[product], reverse=True) + assert builds == expected_builds_list, ( + f"Builds for {product} should match expected: {builds} vs {expected_builds_list}" + ) + + # Test pull_versioned + for product, build in expected_builds: + destination_dir = tmp_path / "downloads" / product / build + destination_dir.mkdir(parents=True, exist_ok=True) + + # Pull a specific file from the build + result = conn.pull_versioned( + key=product, + version=build, + destination_path=destination_dir, + filepath="build.zip", + ) + + pulled_path = result.get("path") + assert pulled_path is not None, f"Should return a path for {product}:{build}" + assert Path(pulled_path).exists(), ( + f"Downloaded file should exist: {pulled_path}" + ) + assert Path(pulled_path).name == "build.zip", ( + "Downloaded file should be named build.zip" + ) + + +def _test_builds_connector_error_cases(conn: BuildsConnector, tmp_path): + """Test error cases and edge conditions.""" + # Test non-existent product + assert conn.list_versions("nonexistent_product") == [], ( + "Non-existent product should return empty list" + ) + + # Test non-existent build for existing product + existing_product = "product1" + assert not conn.version_exists(existing_product, "nonexistent-build"), ( + "Non-existent build should return False" + ) + + # Test get_latest_version should raise NotImplementedError + with pytest.raises( + NotImplementedError, match="Builds don't have a meaningful 'latest' version" + ): + conn.get_latest_version(existing_product) + + # Test pulling non-existent file + destination_dir = tmp_path / "downloads" / "test_error" + destination_dir.mkdir(parents=True, exist_ok=True) + + with pytest.raises(FileNotFoundError, match="File .* not found"): + conn.pull_versioned( + key="product1", + version="ar-distribution", + destination_path=destination_dir, + filepath="nonexistent_file.txt", + ) + + +def test_local_builds_conn(local_builds_conn: BuildsConnector, tmp_path): + """Test the Builds connector with local storage.""" + _test_builds_connector_interface(local_builds_conn, tmp_path) + _test_builds_connector_error_cases(local_builds_conn, tmp_path) From 3815ca2f81ba0c03f9f1b15823c95bb46b9d36ac Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Fri, 16 Jan 2026 13:08:32 -0500 Subject: [PATCH 07/13] Add drafts conn --- dcpy/connectors/edm/drafts.py | 217 ++++++++++++++++++ dcpy/lifecycle/builds/artifacts/drafts.py | 160 +++++++++++++ .../connectors/edm/test_drafts.py | 217 ++++++++++++++++++ 3 files changed, 594 insertions(+) create mode 100644 dcpy/connectors/edm/drafts.py create mode 100644 dcpy/lifecycle/builds/artifacts/drafts.py create mode 100644 dcpy/test_integration/connectors/edm/test_drafts.py diff --git a/dcpy/connectors/edm/drafts.py b/dcpy/connectors/edm/drafts.py new file mode 100644 index 0000000000..459dd35591 --- /dev/null +++ b/dcpy/connectors/edm/drafts.py @@ -0,0 +1,217 @@ +from pathlib import Path + +from dcpy.configuration import ( + PUBLISHING_BUCKET, + PUBLISHING_BUCKET_ROOT_FOLDER, +) +from dcpy.connectors.hybrid_pathed_storage import PathedStorageConnector, StorageType +from dcpy.connectors.registry import VersionedConnector +from dcpy.models.connectors.edm.publishing import ( + DraftKey, + ProductKey, +) +from dcpy.utils.logging import logger + +_TEMP_PUBLISHING_FILE_SUFFIXES = { + ".zip", + ".parquet", + ".csv", + ".pdf", + ".xlsx", + ".json", + ".text", +} + + +def _bucket() -> str: + assert PUBLISHING_BUCKET, ( + "'PUBLISHING_BUCKET' must be defined to use edm.recipes connector" + ) + return PUBLISHING_BUCKET + + +class DraftsConnector(VersionedConnector, arbitrary_types_allowed=True): + conn_type: str = "edm.publishing.drafts" + _storage: PathedStorageConnector | None = None + + def __init__(self, storage: PathedStorageConnector | None = None, **kwargs): + """Initialize DraftsConnector with optional storage.""" + super().__init__(**kwargs) + if storage is not None: + self._storage = storage + + @property + def storage(self) -> PathedStorageConnector: + """Lazy-loaded storage connector. Only initializes when first accessed.""" + if self._storage is None: + self._storage = PathedStorageConnector.from_storage_kwargs( + conn_type="edm.publishing.drafts", + storage_backend=StorageType.S3, + s3_bucket=_bucket(), + root_folder=PUBLISHING_BUCKET_ROOT_FOLDER, + _validate_root_path=False, + ) + return self._storage + + @staticmethod + def create() -> "DraftsConnector": + """Create a DraftsConnector with lazy-loaded S3 storage.""" + return DraftsConnector() + + def _parse_version(self, version: str) -> tuple[str, str]: + """Parse version in format 'version.revision' into (version, revision).""" + if "." not in version: + raise ValueError( + f"Version '{version}' should be in format 'version.revision'" + ) + parts = version.rsplit(".", 1) # Split on last dot + return parts[0], parts[1] + + def _get_draft_versions(self, key: str) -> list[str]: + """Get all draft revisions for a specific product. Key should be product name.""" + product = key + draft_folder_key = f"{product}/draft" + if not self.storage.exists(draft_folder_key): + return [] + + # Get all version folders + version_folders = self.storage.get_subfolders(draft_folder_key) + + # Get all version.revision combinations + versions = [] + for version_name in version_folders: + revision_folder_key = f"{product}/draft/{version_name}" + revision_folders = self.storage.get_subfolders(revision_folder_key) + for revision_name in revision_folders: + versions.append(f"{version_name}.{revision_name}") + return sorted(versions, reverse=True) + + def _download_file( + self, product_key: ProductKey, filepath: str, output_dir: Path | None = None + ) -> Path: + """Download a file from storage using PathedStorageConnector.""" + output_dir = output_dir or Path(".") + is_file_path = output_dir.suffix in _TEMP_PUBLISHING_FILE_SUFFIXES + output_filepath = ( + output_dir / Path(filepath).name if not is_file_path else output_dir + ) + logger.info(f"Downloading {product_key}, {filepath} -> {output_filepath}") + + source_key = f"{product_key.path}/{filepath}" + if not self.storage.exists(source_key): + raise FileNotFoundError(f"File {source_key} not found") + + # Use PathedStorageConnector's pull method + self.storage.pull(key=source_key, destination_path=output_filepath) + return output_filepath + + def _pull( + self, + key: str, + version: str, + destination_path: Path, + *, + revision: str, + source_path: str = "", + dataset: str | None = None, + **kwargs, + ) -> dict: + """Pulls draft to destination path. + + `source_path` can be either a file or a directory. When it is a directory + the contents of that directory will be copied recursively to destination_path + """ + # key is product name, version is 'version.revision' format + product = key + + draft_key = DraftKey(product, version=version, revision=revision) + + path_prefix = dataset + "/" if dataset else "" + file_path = f"{path_prefix}{source_path}" + logger.info( + f"Pulling Draft for {draft_key}, path={file_path}, to={destination_path}" + ) + pulled_path = self._download_file( + draft_key, file_path, output_dir=destination_path + ) + return {"path": pulled_path} + + def pull_versioned( + self, key: str, version: str, destination_path: Path, **kwargs + ) -> dict: + version_parsed, revision_parsed = self._parse_version(version) + # Remove revision from kwargs to avoid duplicate parameter + kwargs.pop("revision", None) + return self._pull( + key, + version=version_parsed, + destination_path=destination_path, + revision=revision_parsed, + **kwargs, + ) + + def push_versioned(self, key: str, version: str, **kwargs) -> dict: + """Push data to drafts folder with version.revision structure. + + Args: + key: Product name + version: Version in format 'version.revision' + **kwargs: Additional arguments including: + - source_path: Local path to push from + - acl: Access control level + - target_path: Optional specific target path within version folder + """ + product = key + draft_version, revision = self._parse_version(version) + + source_path = kwargs.get("source_path") + if not source_path: + raise ValueError("source_path is required for push_versioned") + + # Build the draft path: {product}/draft/{version}/{revision}/ + draft_folder_key = f"{product}/draft/{draft_version}/{revision}" + + # If target_path specified, use it; otherwise use the full folder + target_path = kwargs.get("target_path", "") + full_target_key = ( + f"{draft_folder_key}/{target_path}" if target_path else draft_folder_key + ) + + logger.info(f"Pushing to draft: {source_path} -> {full_target_key}") + + # Use PathedStorageConnector's push method + result = self.storage.push( + key=full_target_key, + filepath=source_path, + **{ + k: v + for k, v in kwargs.items() + if k not in ["source_path", "target_path"] + }, + ) + + return result + + def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]: + logger.info(f"Listing versions for {key}") + return self._get_draft_versions(key) + + def get_latest_version(self, key: str, **kwargs) -> str: + return self.list_versions(key)[0] + + def version_exists(self, key: str, version: str, **kwargs) -> bool: + return version in self.list_versions(key) + + def data_local_sub_path( + self, key: str, *, version: str, revision: str, **kwargs + ) -> Path: + product = key + draft_version, draft_revision = self._parse_version(version) + return ( + Path("edm") + / "publishing" + / "datasets" + / product + / draft_version + / draft_revision + ) diff --git a/dcpy/lifecycle/builds/artifacts/drafts.py b/dcpy/lifecycle/builds/artifacts/drafts.py new file mode 100644 index 0000000000..4ff8e1580f --- /dev/null +++ b/dcpy/lifecycle/builds/artifacts/drafts.py @@ -0,0 +1,160 @@ +from dataclasses import dataclass +from pathlib import Path +from tempfile import TemporaryDirectory + +import pandas as pd +import yaml + +from dcpy.lifecycle.builds.connector import get_drafts_default_connector +from dcpy.models.lifecycle.builds import BuildMetadata + + +def get_dataset_versions(product: str) -> list[str]: + """Get draft versions using drafts connector. + + Note: keys for drafts have both the dataset version (e.g. "24v1") + and the revision, e.g. 1-my-draft. This returns just the versions. + """ + draft_versions = get_drafts_default_connector().list_versions( + product, sort_desc=True + ) + + # Extract just the version part from version.revision format + versions_set = set() + for version_revision in draft_versions: + if "." in version_revision: + version = version_revision.split(".")[0] + versions_set.add(version) + + return sorted(list(versions_set), reverse=True) + + +def get_dataset_version_revisions(product: str, version: str) -> list[str]: + """Get draft revisions for a specific version using drafts connector. + + e.g. return all revisions (1-my-draft, 2-my-other-draft) for a version""" + all_draft_versions = get_drafts_default_connector().list_versions( + product, sort_desc=True + ) + + # Filter to only revisions for the specific version + revisions = [] + for version_revision in all_draft_versions: + if "." in version_revision: + v, revision = version_revision.rsplit(".", maxsplit=1) + if v == version: + revisions.append(revision) + + return sorted(revisions, reverse=True) + + +def get_revision_label(product: str, version: str, revision_num: int) -> str: + """Get draft revision label for a specific revision number using drafts connector.""" + revisions = get_dataset_version_revisions(product, version) + for r in revisions: + revision_split = r.split("-", maxsplit=1) + if revision_split[0] == str(revision_num): + return revision_split[1] if len(revision_split) > 1 else "" + + raise ValueError(f"Draft revision {revision_num} not found in {revisions}") + + +def get_source_data_versions( + product: str, version: str, revision_num: int +) -> pd.DataFrame: + """Get source data versions for a draft.""" + + drafts_conn = get_drafts_default_connector() + with TemporaryDirectory() as _dir: + result = drafts_conn.pull_versioned( + key=product, + version=f"{version}.{revision_num}", + source_path="source_data_versions.csv", + destination_path=Path(_dir), + ) + + df = pd.read_csv(result.get("path")) + return df + + +@dataclass +class _FullDraftKey: + product: str + dataset_version: str + revision_num: int + revision_label: str + + @property + def revision(self) -> str: + """Return formatted revision string (e.g., '1' or '1-label').""" + if self.revision_label: + return f"{self.revision_num}-{self.revision_label}" + return str(self.revision_num) + + def __str__(self): + return f"{self.product}::{self.dataset_version}::{self.revision}" + + +def resolve_full_version( + product: str, + dataset_version: str, + *, + # Pass either the revision_num or the full_revision (num+label) + revision_num: int | None = None, + full_revision: str = "", +) -> _FullDraftKey: + """Resolve full draft version information from either revision_num or full_revision. + + Args: + product: Product name + dataset_version: Version string (e.g., "25v3") + revision_num: Revision number (mutually exclusive with full_revision) + full_revision: Full revision string like "1-fix-bug" (mutually exclusive with revision_num) + + Returns: + _FullDraftKey: Object containing dataset_version, revision_num, and revision_label + + Raises: + ValueError: If neither or both revision_num and full_revision are provided + """ + if not (revision_num or full_revision) or (revision_num and full_revision): + raise ValueError( + "Exactly one of revision_num or full_revision must be provided" + ) + + if revision_num: + return _FullDraftKey( + product=product, + dataset_version=dataset_version, + revision_num=revision_num, + revision_label=get_revision_label(product, dataset_version, revision_num), + ) + else: + revision_parts = full_revision.rsplit("-", maxsplit=1) + return _FullDraftKey( + product=product, + dataset_version=dataset_version, + revision_num=int(revision_parts[0]), + revision_label=revision_parts[1] if len(revision_parts) == 2 else "", + ) + + +def get_metadata(product: str, full_version: str) -> BuildMetadata: + """Get metadata for a draft. + + Args: + product: Product name + full_version: Full version string like "25v3.1-my-draft" + """ + with TemporaryDirectory() as _dir: + md_path = ( + get_drafts_default_connector() + .pull_versioned( + key=product, + version=full_version, + source_path="build_metadata.json", + destination_path=Path(_dir), + ) + .get("path") + ) + return BuildMetadata(**yaml.safe_load(open(md_path).read())) diff --git a/dcpy/test_integration/connectors/edm/test_drafts.py b/dcpy/test_integration/connectors/edm/test_drafts.py new file mode 100644 index 0000000000..2c8f320e0f --- /dev/null +++ b/dcpy/test_integration/connectors/edm/test_drafts.py @@ -0,0 +1,217 @@ +import time +import uuid +from pathlib import Path + +import pytest + +from dcpy.connectors.edm.drafts import DraftsConnector +from dcpy.connectors.hybrid_pathed_storage import ( + PathedStorageConnector, + StorageType, +) + + +def calculate_test_path(): + return ( + Path("integration_tests") + / "edm" + / "connectors" + / "drafts" + / f"{int(time.time())}_{str(uuid.uuid4())[:8]}" + ) + + +def _make_fake_draft_datasets(): + """Create test product, version, and revision combinations.""" + products = ["housing_db", "pluto_changes"] + versions = ["2024", "2025"] + revisions = ["1", "2", "3"] + combos = [] + for product in products: + for version in versions: + for revision in revisions: + combos.append((product, version, revision)) + return combos + + +def _create_draft_files(conn: DraftsConnector, tmp_path: Path): + """Create fake draft files in the storage.""" + # Create temporary files locally then push them to storage + local_staging = tmp_path / "staging" + local_staging.mkdir(parents=True, exist_ok=True) + + for product, version, revision in _make_fake_draft_datasets(): + # Create some mock draft files + files_to_create = ["draft_data.csv", "draft_metadata.json", "notes.txt"] + + for filename in files_to_create: + # Create local file + local_file = local_staging / f"{product}_{version}_{revision}_{filename}" + if filename == "draft_data.csv": + local_file.write_text( + f"id,name,status\\n1,{product},draft\\n2,{product},reviewing" + ) + elif filename == "draft_metadata.json": + local_file.write_text( + f'{{"product": "{product}", "version": "{version}", "revision": "{revision}", "status": "draft"}}' + ) + else: + local_file.write_text( + f"Draft notes for {product} version {version} revision {revision}" + ) + + # Push to storage using the connector's interface + # For drafts, the structure is: {product}/draft/{version}/{revision}/{filename} + storage_key = f"{product}/draft/{version}/{revision}/{filename}" + conn.storage.push(key=storage_key, filepath=str(local_file)) + + +@pytest.fixture +def local_drafts_conn(tmp_path): + """Create a Drafts connector with local storage for testing.""" + # Create the root directory first + root_dir = tmp_path / "drafts" + root_dir.mkdir(parents=True, exist_ok=True) + + # Create connector with local storage instead of S3 + storage = PathedStorageConnector.from_storage_kwargs( + conn_type="edm.publishing.drafts.test", + storage_backend=StorageType.LOCAL, + local_dir=root_dir, + ) + conn = DraftsConnector(storage=storage) + _create_draft_files(conn, tmp_path) + return conn + + +def _test_drafts_connector_interface(conn: DraftsConnector, tmp_path): + """Test the basic drafts connector interface methods.""" + expected_drafts = _make_fake_draft_datasets() + # e.g. [('housing_db', '2024', '1'), ('housing_db', '2024', '2'), ...] + + expected_products = {product for product, _, _ in expected_drafts} + expected_versions_by_product: dict[str, list[str]] = {} + for product, version, revision in expected_drafts: + if product not in expected_versions_by_product: + expected_versions_by_product[product] = [] + # For drafts, we expect version.revision format + version_revision = f"{version}.{revision}" + expected_versions_by_product[product].append(version_revision) + + # Check that all versions exist + for product, version, revision in expected_drafts: + version_revision = f"{version}.{revision}" + assert conn.version_exists(product, version=version_revision), ( + f"Version {version_revision} should exist for product {product}" + ) + + # Test listing versions (should return version.revision format) + for product in expected_products: + versions = conn.list_versions(product) + expected_versions_list = sorted( + expected_versions_by_product[product], reverse=True + ) + assert versions == expected_versions_list, ( + f"Versions for {product} should match expected: {versions} vs {expected_versions_list}" + ) + + # Test get_latest_version + for product in expected_products: + latest = conn.get_latest_version(product) + expected_latest = max(expected_versions_by_product[product]) + assert latest == expected_latest, ( + f"Latest version for {product} should be {expected_latest}" + ) + + # Test pull_versioned + for product, version, revision in expected_drafts: + destination_dir = tmp_path / "downloads" / product / version / revision + destination_dir.mkdir(parents=True, exist_ok=True) + + version_revision = f"{version}.{revision}" + + # Pull a specific file from the draft + result = conn.pull_versioned( + key=product, + version=version_revision, + destination_path=destination_dir, + source_path="draft_data.csv", + ) + + pulled_path = result.get("path") + assert pulled_path is not None, ( + f"Should return a path for {product}:{version_revision}" + ) + assert Path(pulled_path).exists(), ( + f"Downloaded file should exist: {pulled_path}" + ) + assert Path(pulled_path).name == "draft_data.csv", ( + "Downloaded file should be named draft_data.csv" + ) + + # Verify content + content = Path(pulled_path).read_text() + assert product in content, ( + f"Downloaded content should contain product name {product}" + ) + + +def _test_drafts_connector_error_cases(conn: DraftsConnector, tmp_path): + """Test error cases and edge conditions.""" + # Test non-existent product + assert conn.list_versions("nonexistent_product") == [], ( + "Non-existent product should return empty list" + ) + + # Test non-existent version for existing product + existing_product = "housing_db" + assert not conn.version_exists(existing_product, "2099.99"), ( + "Non-existent version should return False" + ) + + # Test version parsing - invalid format should raise ValueError + with pytest.raises( + ValueError, match="Version .* should be in format 'version.revision'" + ): + conn._parse_version("invalid_format") + + # Test pulling non-existent file + destination_dir = tmp_path / "downloads" / "test_error" + destination_dir.mkdir(parents=True, exist_ok=True) + + with pytest.raises(FileNotFoundError, match="File .* not found"): + conn.pull_versioned( + key="housing_db", + version="2024.1", + destination_path=destination_dir, + source_path="nonexistent_file.txt", + ) + + # Test get_latest_version on empty product list + with pytest.raises(IndexError): + conn.get_latest_version("nonexistent_product") + + +def _test_drafts_connector_version_parsing(conn: DraftsConnector): + """Test version parsing functionality.""" + # Test valid version.revision format + version, revision = conn._parse_version("2024.1") + assert version == "2024" + assert revision == "1" + + # Test complex version.revision format + version, revision = conn._parse_version("2024-beta.5") + assert version == "2024-beta" + assert revision == "5" + + # Test version with multiple dots (should split on last dot) + version, revision = conn._parse_version("v1.2.3.final") + assert version == "v1.2.3" + assert revision == "final" + + +def test_local_drafts_conn(local_drafts_conn: DraftsConnector, tmp_path): + """Test the Drafts connector with local storage.""" + _test_drafts_connector_interface(local_drafts_conn, tmp_path) + _test_drafts_connector_error_cases(local_drafts_conn, tmp_path) + _test_drafts_connector_version_parsing(local_drafts_conn) From 296a88f5b58341f06681873f5a963f9d28f3232d Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Fri, 16 Jan 2026 13:09:16 -0500 Subject: [PATCH 08/13] Add published conn --- dcpy/connectors/edm/published.py | 175 ++++++++++ dcpy/lifecycle/builds/artifacts/published.py | 314 ++++++++++++++++++ .../connectors/edm/test_published.py | 238 +++++++++++++ 3 files changed, 727 insertions(+) create mode 100644 dcpy/connectors/edm/published.py create mode 100644 dcpy/lifecycle/builds/artifacts/published.py create mode 100644 dcpy/test_integration/connectors/edm/test_published.py diff --git a/dcpy/connectors/edm/published.py b/dcpy/connectors/edm/published.py new file mode 100644 index 0000000000..6327b95347 --- /dev/null +++ b/dcpy/connectors/edm/published.py @@ -0,0 +1,175 @@ +from pathlib import Path + +from dcpy.configuration import PUBLISHING_BUCKET, PUBLISHING_BUCKET_ROOT_FOLDER + +from dcpy.connectors.hybrid_pathed_storage import PathedStorageConnector, StorageType +from dcpy.connectors.registry import VersionedConnector +from dcpy.models.connectors.edm.publishing import PublishKey +from dcpy.utils.logging import logger + + +def _bucket() -> str: + assert PUBLISHING_BUCKET, ( + "'PUBLISHING_BUCKET' must be defined to use edm.published connector" + ) + return PUBLISHING_BUCKET + + +# This is a (hopefully) temporary hack while we think about +# distinguishing filepaths vs directories in the connector interface. +_TEMP_PUBLISHING_FILE_SUFFIXES = { + ".zip", + ".parquet", + ".csv", + ".pdf", + ".xlsx", + ".json", + ".text", +} + + +class PublishedConnector(VersionedConnector, arbitrary_types_allowed=True): + conn_type: str = "edm.publishing.published" + _storage: PathedStorageConnector | None = None + + def __init__(self, storage: PathedStorageConnector | None = None, **kwargs): + """Initialize PublishedConnector with optional storage.""" + super().__init__(**kwargs) + if storage is not None: + self._storage = storage + + @property + def storage(self) -> PathedStorageConnector: + """Lazy-loaded storage connector. Only initializes when first accessed.""" + if self._storage is None: + self._storage = PathedStorageConnector.from_storage_kwargs( + conn_type="edm.publishing.published", + storage_backend=StorageType.S3, + s3_bucket=_bucket(), + root_folder=PUBLISHING_BUCKET_ROOT_FOLDER, + _validate_root_path=False, + ) + return self._storage + + @staticmethod + def create() -> "PublishedConnector": + """Create a PublishedConnector with lazy-loaded S3 storage.""" + return PublishedConnector() + + def _download_file( + self, product_key, filepath: str, output_dir: Path | None = None + ) -> Path: + """Download a file from storage using cloudpathlib.""" + output_dir = output_dir or Path(".") + is_file_path = output_dir.suffix in _TEMP_PUBLISHING_FILE_SUFFIXES + output_filepath = ( + output_dir / Path(filepath).name if not is_file_path else output_dir + ) + logger.info(f"Downloading {product_key}, {filepath} -> {output_filepath}") + + source_key = f"{product_key.path}/{filepath}" + if not self.storage.exists(source_key): + raise FileNotFoundError(f"File {source_key} not found") + + # Use PathedStorageConnector's pull method + self.storage.pull(key=source_key, destination_path=output_filepath) + return output_filepath + + def _get_published_versions( + self, key: str, exclude_latest: bool = True + ) -> list[str]: + """Get all published versions for a product using PathedStorageConnector.""" + product = key + publish_folder_key = f"{product}/publish" + if not self.storage.exists(publish_folder_key): + return [] + + # Get all version folders + versions = self.storage.get_subfolders(publish_folder_key) + + # Filter out 'latest' if requested + if exclude_latest: + versions = [v for v in versions if v != "latest"] + + return sorted(versions, reverse=True) + + def _pull( + self, + key: str, + version: str, + destination_path: Path, + *, + filepath: str = "", + dataset: str | None = None, + **kwargs, + ) -> dict: + pub_key = PublishKey(key, version) + + s3_path = dataset + "/" if dataset else "" + full_filepath = s3_path + filepath + + pulled_path = self._download_file( + pub_key, + full_filepath, + output_dir=destination_path, + ) + return {"path": pulled_path} + + def pull_versioned( + self, key: str, version: str, destination_path: Path, **kwargs + ) -> dict: + return self._pull(key, version, destination_path, **kwargs) + + def push_versioned(self, key: str, version: str, **kwargs) -> dict: + """Push data to published folder with version structure. + + Args: + key: Product name + version: Version string (e.g., '1.0', '1.0.1', 'latest') + **kwargs: Additional arguments including: + - source_path: Local path to push from + - acl: Access control level + - target_path: Optional specific target path within version folder + """ + product = key + + source_path = kwargs.get("source_path") + if not source_path: + raise ValueError("source_path is required for push_versioned") + + # Build the published path: {product}/publish/{version}/ + publish_folder_key = f"{product}/publish/{version}" + + # If target_path specified, use it; otherwise use the full folder + target_path = kwargs.get("target_path", "") + full_target_key = ( + f"{publish_folder_key}/{target_path}" if target_path else publish_folder_key + ) + + logger.info(f"Pushing to published: {source_path} -> {full_target_key}") + + # Use PathedStorageConnector's push method + result = self.storage.push( + key=full_target_key, + filepath=source_path, + **{ + k: v + for k, v in kwargs.items() + if k not in ["source_path", "target_path"] + }, + ) + + return result + + def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]: + versions = self._get_published_versions(key, **kwargs) + return sorted(versions, reverse=sort_desc) + + def get_latest_version(self, key: str, **kwargs) -> str: + return self.list_versions(key)[0] + + def version_exists(self, key: str, version: str, **kwargs) -> bool: + return version in self.list_versions(key) + + def data_local_sub_path(self, key: str, *, version: str, **kwargs) -> Path: # type: ignore[override] + return Path("edm") / "publishing" / "datasets" / key / version diff --git a/dcpy/lifecycle/builds/artifacts/published.py b/dcpy/lifecycle/builds/artifacts/published.py new file mode 100644 index 0000000000..23edf3ef73 --- /dev/null +++ b/dcpy/lifecycle/builds/artifacts/published.py @@ -0,0 +1,314 @@ +import json +from pathlib import Path +from tempfile import TemporaryDirectory + +import yaml + +from dcpy.lifecycle.builds import connector as build_conns +from dcpy.lifecycle.builds.artifacts.drafts import ( + get_metadata, + get_dataset_version_revisions, + resolve_full_version, +) +from dcpy.lifecycle.builds.connector import ( + get_published_default_connector, +) +from dcpy.models.connectors.edm.publishing import PublishKey +from dcpy.models.lifecycle.builds import BuildMetadata +from dcpy.utils import versions +from dcpy.utils.logging import logger + +# connector = get_published_default_connector() + +BUILD_METADATA_FILE = "build_metadata.json" + + +def get_latest_version(product: str) -> str | None: + return get_published_default_connector().get_latest_version(product) + + +def get_versions(product: str, exclude_latest: bool = True) -> list[str]: + """Get published versions using published connector.""" + return get_published_default_connector().list_versions( + product, sort_desc=True, exclude_latest=exclude_latest + ) + + +def get_previous_version( + product: str, version: str | versions.Version +) -> versions.Version: + """Get previous version using published connector.""" + # Normalize version input + match version: + case str(): + version_obj = versions.parse(version) + case versions.Version(): + version_obj = version + + # Get published versions using connector + published_version_strings = get_versions(product) + logger.info(f"Published versions of {product}: {published_version_strings}") + + published_versions = [versions.parse(v) for v in published_version_strings] + published_versions.sort() + + if len(published_versions) == 0: + raise LookupError(f"No published versions found for {product}") + + if version_obj in published_versions: + index = published_versions.index(version_obj) + if index == 0: + raise LookupError( + f"{product} - {version} is the oldest published version, and has no previous" + ) + else: + return published_versions[index - 1] + else: + latest = published_versions[-1] + if version_obj > latest: + return latest + else: + raise LookupError( + f"{product} - {version} is not published and appears to be 'older' than latest published version. Cannot determine previous." + ) + + +def fetch_metadata_file(product: str, folder_version: str, out_path: Path): + """Pulls the metadata file for a product. + + Note: we pass the 'folder_version', which might be a normal version, or 'latest' + """ + meta_path = build_conns.get_published_default_connector().pull_versioned( + key=product, + version=folder_version, + filepath=BUILD_METADATA_FILE, + destination_path=out_path, + ) + return Path(meta_path.get("path")) / BUILD_METADATA_FILE + + +def fetch_version_from_metadata(product: str, folder_version: str): + with TemporaryDirectory() as tmp_dir_str: + md_dir = Path(tmp_dir_str) + fetch_metadata_file(product, folder_version, md_dir) + return BuildMetadata( + **yaml.safe_load(open(Path(md_dir) / BUILD_METADATA_FILE).read()) + ).version + + +def validate_or_patch_version( + product: str, + version: str, + is_patch: bool, +) -> str: + """Given input arguments, determine the publish version, bumping it if necessary.""" + published_versions = get_versions(product=product) + + # Filters existing published versions for same version (patched or non-patched) + published_same_version = versions.group_versions_by_base( + version, published_versions + ) + version_already_published = version in published_same_version + + if version_already_published: + if is_patch: + latest_version = published_same_version[-1] + patched_version = versions.bump( + previous_version=latest_version, + bump_type=versions.VersionSubType.patch, + bump_by=1, + ).label + assert patched_version not in published_versions # sanity check + return patched_version + else: + raise ValueError( + f"Version '{version}' already exists in published folder and patch wasn't selected" + ) + + logger.info(f"Predicted version in publish folder: {version}") + return version + + +def patch_metadata( + product: str, + draft_version_revision: str, + patch_version: str, + publish_version: str, + tmp_dir: Path, + published_conn, + acl: str = "", +) -> None: + """Update metadata with patched version and upload to published.""" + logger.info( + f"Patching metadata version {draft_version_revision} with new version {publish_version}" + ) + md = get_metadata(product, draft_version_revision) + md.version = patch_version + patched_md_path = tmp_dir / "build_metadata_patched.json" + patched_md_path.write_text(json.dumps(md.model_dump(mode="json"), indent=4)) + + published_conn.push_versioned( + key=product, + version=publish_version, + source_path=str(patched_md_path), + target_path="build_metadata.json", + acl=acl, + ) + + +def _copy_draft_to_published( + product: str, + draft_version_revision: str, + publish_version: str, + acl: str = "", + is_patch: bool = False, + copy_to_latest: bool = False, +) -> None: + """Copy all files from draft to published using connectors.""" + drafts_conn = build_conns.get_drafts_default_connector() + published_conn = build_conns.get_published_default_connector() + with TemporaryDirectory() as tmp_dir_str: + tmp_dir = Path(tmp_dir_str) + + # Pull entire draft folder to temp + drafts_conn.pull_versioned( + key=product, version=draft_version_revision, destination_path=tmp_dir + ) + + # Push entire folder to published + published_conn.push_versioned( + key=product, version=publish_version, source_path=str(tmp_dir), acl=acl + ) + logger.info( + f"Copied draft {product}/{draft_version_revision} to published {product}/{publish_version}" + ) + if is_patch: + logger.info( + f"Also patching published metadata versions: {draft_version_revision} -> {publish_version}" + ) + patch_metadata( + product=product, + draft_version_revision=draft_version_revision, + patch_version=publish_version, + publish_version=publish_version, + tmp_dir=tmp_dir, + published_conn=published_conn, + acl=acl, + ) + + # TODO: TEMPORARY HACK - copy_to_latest is a workaround until we iron out + # proper s3/blob storage copying mechanisms. This should be refactored + # to use proper connector optimization when available. + if copy_to_latest: + published_conn.push_versioned( + key=product, + version="latest", + source_path=str(tmp_dir), + acl=acl, + ) + logger.info( + f"Copied {product}/{publish_version} to latest folder (temporary hack)" + ) + if is_patch: + logger.info(f"Also patching latest metadata to new version {is_patch}") + patch_metadata( + product=product, + draft_version_revision=draft_version_revision, + patch_version=publish_version, + publish_version="latest", # Updated for latest + tmp_dir=tmp_dir, + published_conn=published_conn, + acl=acl, + ) + + +def publish( + product: str, + version: str, + draft_revision_num: int = 0, + acl: str = "", + latest: bool = False, + is_patch: bool = False, + metadata_file_dir: Path | None = None, +) -> PublishKey: + """ + Publishes a draft to the published datastore using connectors. + + Args: + product: Data product name + version: Data product release version + draft_revision_num: Draft revision number to publish (if 0, uses latest) + acl: Access control level for uploaded files + latest: Whether to publish to latest folder as well + is_patch: Whether to create a patched version if version already exists + metadata_file_dir: dir to download metadata after publishing + + Returns: + PublishKey: The key for the published version + + Raises: + FileNotFoundError: If the draft does not exist. + ValueError: If version validation fails. + """ + logger.info(f'Publishing {product} version {version} with ACL "{acl}"') + + # Get the full draft version information + if draft_revision_num == 0: + # If no specific revision requested, get the latest revision + revisions = get_dataset_version_revisions(product, version) + assert revisions, f"No revisions for {product}, {version}" + latest_revision = revisions[0] + draft_key = resolve_full_version( + product, version, full_revision=latest_revision + ) + else: + draft_key = resolve_full_version( + product, version, revision_num=draft_revision_num + ) + + # Validate and determine final version to publish + publish_version = validate_or_patch_version( + product, draft_key.dataset_version, is_patch + ) + logger.info(f'Publishing {draft_key} as version {publish_version} with ACL "{acl}"') + + # When "latest" is requested, determine if that's possible + if latest: + latest_published_version = ( + build_conns.get_published_default_connector().get_latest_version(product) + ) + if latest_published_version: + after_or_equals_latest_version = ( + versions.is_newer( + version_1=publish_version, version_2=latest_published_version + ) + or publish_version == latest_published_version + ) + else: + after_or_equals_latest_version = None + + if after_or_equals_latest_version or latest_published_version is None: + logger.info(f"Updated 'latest' folder with version {publish_version}") + else: + raise ValueError( + f"Unable to update 'latest' folder: the version {publish_version} is older than 'latest' ({latest_published_version})" + ) + + _copy_draft_to_published( + product, + draft_version_revision=f"{draft_key.dataset_version}.{draft_key.revision}", + publish_version=publish_version, + acl=acl, + copy_to_latest=latest, + is_patch=(version != publish_version), + ) + + # Download metadata if requested + if metadata_file_dir: + md_file_path = fetch_metadata_file( + product, folder_version=publish_version, out_path=metadata_file_dir + ) + logger.info(f"Pulled build_metadata.json to {md_file_path}") + + logger.info(f"Successfully published {product} version {publish_version}") + return PublishKey(product, publish_version) diff --git a/dcpy/test_integration/connectors/edm/test_published.py b/dcpy/test_integration/connectors/edm/test_published.py new file mode 100644 index 0000000000..c3767be33d --- /dev/null +++ b/dcpy/test_integration/connectors/edm/test_published.py @@ -0,0 +1,238 @@ +import time +import uuid +from pathlib import Path + +import pytest + +from dcpy.connectors.edm.published import PublishedConnector +from dcpy.connectors.hybrid_pathed_storage import ( + PathedStorageConnector, + StorageType, +) + + +def calculate_test_path(): + return ( + Path("integration_tests") + / "edm" + / "connectors" + / "published" + / f"{int(time.time())}_{str(uuid.uuid4())[:8]}" + ) + + +def _make_fake_published_datasets(): + """Create test product and version combinations.""" + products = ["nycha_developments", "pluto"] + versions = ["24v1", "24v2", "25v1"] # Changed to match expected version format + combos = [] + for product in products: + for version in versions: + combos.append((product, version)) + return combos + + +def _create_published_files(conn: PublishedConnector, tmp_path: Path): + """Create fake published dataset files in the storage.""" + # Create temporary files locally then push them to storage + local_staging = tmp_path / "staging" + local_staging.mkdir(parents=True, exist_ok=True) + + for product, version in _make_fake_published_datasets(): + # Create some mock published files + files_to_create = ["data.csv", "metadata.json", "readme.txt"] + + for filename in files_to_create: + # Create local file + local_file = local_staging / f"{product}_{version}_{filename}" + if filename == "data.csv": + local_file.write_text( + f"id,name,value\\n1,{product},test\\n2,{product},data" + ) + elif filename == "metadata.json": + local_file.write_text( + f'{{"product": "{product}", "version": "{version}", "created": "2024-01-01"}}' + ) + else: + local_file.write_text(f"README for {product} version {version}") + + # Push to storage using the connector's interface + # For published, the structure is: {product}/publish/{version}/{filename} + storage_key = f"{product}/publish/{version}/{filename}" + conn.storage.push(key=storage_key, filepath=str(local_file)) + + +@pytest.fixture +def local_published_conn(tmp_path): + """Create a Published connector with local storage for testing.""" + # Create the root directory first + root_dir = tmp_path / "published" + root_dir.mkdir(parents=True, exist_ok=True) + + # Create connector with local storage instead of S3 + storage = PathedStorageConnector.from_storage_kwargs( + conn_type="edm.publishing.published.test", + storage_backend=StorageType.LOCAL, + local_dir=root_dir, + ) + conn = PublishedConnector(storage=storage) + _create_published_files(conn, tmp_path) + return conn + + +def _test_published_connector_interface(conn: PublishedConnector, tmp_path): + """Test the basic published connector interface methods.""" + expected_published = _make_fake_published_datasets() + # e.g. [('nycha_developments', '2024v1'), ('pluto', '2024v1'), ...] + + expected_products = {product for product, _ in expected_published} + expected_versions_by_product: dict[str, list[str]] = {} + for product, version in expected_published: + if product not in expected_versions_by_product: + expected_versions_by_product[product] = [] + expected_versions_by_product[product].append(version) + + # Check that all versions exist + for product, version in expected_published: + assert conn.version_exists(product, version=version), ( + f"Version {version} should exist for product {product}" + ) + + # Test listing versions + for product in expected_products: + versions = conn.list_versions(product) + expected_versions_list = sorted( + expected_versions_by_product[product], reverse=True + ) + assert versions == expected_versions_list, ( + f"Versions for {product} should match expected: {versions} vs {expected_versions_list}" + ) + + # Test get_latest_version + for product in expected_products: + latest = conn.get_latest_version(product) + expected_latest = max(expected_versions_by_product[product]) + assert latest == expected_latest, ( + f"Latest version for {product} should be {expected_latest}" + ) + + # Test pull_versioned + for product, version in expected_published: + destination_dir = tmp_path / "downloads" / product / version + destination_dir.mkdir(parents=True, exist_ok=True) + + # Pull a specific file from the published dataset + result = conn.pull_versioned( + key=product, + version=version, + destination_path=destination_dir, + filepath="data.csv", + ) + + pulled_path = result.get("path") + assert pulled_path is not None, f"Should return a path for {product}:{version}" + assert Path(pulled_path).exists(), ( + f"Downloaded file should exist: {pulled_path}" + ) + assert Path(pulled_path).name == "data.csv", ( + "Downloaded file should be named data.csv" + ) + + # Verify content + content = Path(pulled_path).read_text() + assert product in content, ( + f"Downloaded content should contain product name {product}" + ) + + +def _test_published_connector_error_cases(conn: PublishedConnector, tmp_path): + """Test error cases and edge conditions.""" + # Test non-existent product + assert conn.list_versions("nonexistent_product") == [], ( + "Non-existent product should return empty list" + ) + + # Test non-existent version for existing product + existing_product = "nycha_developments" + assert not conn.version_exists(existing_product, "nonexistent_version"), ( + "Non-existent version should return False" + ) + + # Test pulling non-existent file + destination_dir = tmp_path / "downloads" / "test_error" + destination_dir.mkdir(parents=True, exist_ok=True) + + with pytest.raises(FileNotFoundError, match="File .* not found"): + conn.pull_versioned( + key="nycha_developments", + version="2024v1", + destination_path=destination_dir, + filepath="nonexistent_file.txt", + ) + + # Test get_latest_version on empty product list + with pytest.raises(IndexError): + conn.get_latest_version("nonexistent_product") + + # Test push_versioned functionality + test_dir = tmp_path / "test_version_dir" + test_dir.mkdir(exist_ok=True) + test_file = test_dir / "test_file.txt" + test_file.write_text("test content") + + # Push a directory to a new version + result = conn.push_versioned( + "test_product", "test_version", source_path=str(test_dir) + ) + + # Verify the push was successful + assert result is not None, "push_versioned should return a result" + + # Verify the version now exists + assert conn.version_exists("test_product", "test_version"), ( + "Version should exist after pushing" + ) + + # Verify the pushed content can be retrieved + pull_destination = tmp_path / "pulled_test" + pull_destination.mkdir(exist_ok=True) + + pull_result = conn.pull_versioned( + key="test_product", + version="test_version", + destination_path=pull_destination, + filepath="test_file.txt", + ) + + pulled_path = pull_result.get("path") + assert pulled_path is not None, "Should return a path for pulled file" + assert Path(pulled_path).exists(), "Pulled file should exist" + + # Verify content matches + pulled_content = Path(pulled_path).read_text() + assert pulled_content == "test content", "Pulled content should match original" + + # Verify the pushed content can be retrieved + pull_destination = tmp_path / "pulled_test" + pull_destination.mkdir(exist_ok=True) + + pull_result = conn.pull_versioned( + key="test_product", + version="test_version", + destination_path=pull_destination, + filepath="test_file.txt", + ) + + pulled_path = pull_result.get("path") + assert pulled_path is not None, "Should return a path for pulled file" + assert Path(pulled_path).exists(), "Pulled file should exist" + + # Verify content matches + pulled_content = Path(pulled_path).read_text() + assert pulled_content == "test content", "Pulled content should match original" + + +def test_local_published_conn(local_published_conn: PublishedConnector, tmp_path): + """Test the Published connector with local storage.""" + _test_published_connector_interface(local_published_conn, tmp_path) + _test_published_connector_error_cases(local_published_conn, tmp_path) From 59cdffa56e73d7acadfdf1687378796a56538884 Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Fri, 16 Jan 2026 13:19:42 -0500 Subject: [PATCH 09/13] Add GIS conn --- dcpy/connectors/edm/gis.py | 140 +++++++++++++++ .../connectors/edm/test_gis.py | 160 ++++++++++++++++++ 2 files changed, 300 insertions(+) create mode 100644 dcpy/connectors/edm/gis.py create mode 100644 dcpy/test_integration/connectors/edm/test_gis.py diff --git a/dcpy/connectors/edm/gis.py b/dcpy/connectors/edm/gis.py new file mode 100644 index 0000000000..f77a0c623f --- /dev/null +++ b/dcpy/connectors/edm/gis.py @@ -0,0 +1,140 @@ +import re +from pathlib import Path + +from dcpy.configuration import ( + PUBLISHING_BUCKET, +) +from dcpy.connectors.hybrid_pathed_storage import PathedStorageConnector, StorageType +from dcpy.connectors.registry import VersionedConnector +from dcpy.utils.logging import logger + + +def _bucket() -> str: + assert PUBLISHING_BUCKET, ( + "'PUBLISHING_BUCKET' must be defined to use edm.recipes connector" + ) + return PUBLISHING_BUCKET + + +class GisDatasetsConnector(VersionedConnector, arbitrary_types_allowed=True): + conn_type: str = "edm.publishing.gis" + _storage: PathedStorageConnector | None = None + + def __init__(self, storage: PathedStorageConnector | None = None, **kwargs): + """Initialize GisDatasetsConnector with optional storage.""" + super().__init__(**kwargs) + if storage is not None: + self._storage = storage + + @property + def storage(self) -> PathedStorageConnector: + """Lazy-loaded storage connector. Only initializes when first accessed.""" + if self._storage is None: + self._storage = PathedStorageConnector.from_storage_kwargs( + conn_type="edm.publishing.gis", + storage_backend=StorageType.S3, + s3_bucket=_bucket(), + root_folder="datasets", + _validate_root_path=False, + ) + return self._storage + + @staticmethod + def create() -> "GisDatasetsConnector": + """Create a GisDatasetsConnector with lazy-loaded S3 storage.""" + return GisDatasetsConnector() + + def _gis_dataset_path(self, name: str, version: str) -> str: + """Get the path to a GIS dataset file.""" + return f"{name}/{version}/{name}.zip" + + def _assert_gis_dataset_exists(self, name: str, version: str): + """Assert that a GIS dataset exists in storage.""" + version = version.upper() + path_key = self._gis_dataset_path(name, version) + if not self.storage.exists(path_key): + raise FileNotFoundError(f"GIS dataset {name} has no version {version}") + + def _get_gis_dataset_versions( + self, dataset_name: str, sort_desc: bool = True + ) -> list[str]: + """ + Get all versions of GIS-published dataset in storage. + """ + gis_version_formats = [r"^\d{2}[A-Z]$", r"^\d{8}$"] + subfolders = [] + matched_formats = set() + + # Check if dataset folder exists + if not self.storage.exists(dataset_name): + raise AssertionError(f"No Dataset named {dataset_name} found.") + + # Get subfolders for the dataset + folder_names = self.storage.get_subfolders(dataset_name) + + for folder_name in folder_names: + for pattern in gis_version_formats: + if re.match(pattern, folder_name): + subfolders.append(folder_name) + matched_formats.add(pattern) + break + + if subfolders and len(matched_formats) > 1: + raise ValueError( + f"Multiple version formats found for gis dataset {dataset_name}. Cannot determine latest version" + ) + return sorted(subfolders, reverse=sort_desc) + + def _get_latest_gis_dataset_version(self, dataset_name: str) -> str: + """ + Get latest version of GIS-published dataset in storage. + """ + versions = self._get_gis_dataset_versions(dataset_name) + if not versions: + raise FileNotFoundError(f"No versions found for GIS dataset {dataset_name}") + version = versions[0] + self._assert_gis_dataset_exists(dataset_name, version) + return version + + def _download_gis_dataset( + self, dataset_name: str, version: str, target_folder: Path + ): + """ + Download GIS-published dataset from storage to target folder. + """ + version = version.upper() + if not target_folder.is_dir(): + raise ValueError(f"Target folder '{target_folder}' is not a directory") + + self._assert_gis_dataset_exists(dataset_name, version) + + source_key = self._gis_dataset_path(dataset_name, version) + file_path = target_folder / f"{dataset_name}.zip" + + # Use PathedStorageConnector's pull method which handles all storage types + self.storage.pull(key=source_key, destination_path=file_path) + + return file_path + + def pull_versioned( + self, key: str, version: str, destination_path: Path, **kwargs + ) -> dict: + pulled_path = self._download_gis_dataset( + dataset_name=key, version=version, target_folder=destination_path + ) + return {"path": pulled_path} + + def push_versioned(self, key: str, version: str, **kwargs) -> dict: + raise PermissionError( + "Currently, only GIS team pushes to edm-publishing/datasets" + ) + + def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]: + logger.info(f"Listing versions for {key}") + return self._get_gis_dataset_versions(key, sort_desc=sort_desc) + + def get_latest_version(self, key: str, **kwargs) -> str: + return self._get_latest_gis_dataset_version(key) + + def version_exists(self, key: str, version: str, **kwargs) -> bool: + return version in self.list_versions(key) diff --git a/dcpy/test_integration/connectors/edm/test_gis.py b/dcpy/test_integration/connectors/edm/test_gis.py new file mode 100644 index 0000000000..f5a5ccaeeb --- /dev/null +++ b/dcpy/test_integration/connectors/edm/test_gis.py @@ -0,0 +1,160 @@ +import time +import uuid +from pathlib import Path + +import pytest + +from dcpy.connectors.edm.gis import GisDatasetsConnector +from dcpy.connectors.hybrid_pathed_storage import ( + PathedStorageConnector, + StorageType, +) + + +def calculate_test_path(): + return ( + Path("integration_tests") + / "edm" + / "connectors" + / "gis" + / f"{int(time.time())}_{str(uuid.uuid4())[:8]}" + ) + + +def _make_fake_gis_datasets(): + """Create test dataset combinations.""" + datasets = ["dcp_mandatory_inclusionary_housing", "dof_dtm_condos"] + versions = ["20241201", "20241215"] # Date format versions + combos = [] + for ds in datasets: + for v in versions: + combos.append((ds, v)) + return combos + + +def _create_gis_files(conn: GisDatasetsConnector, tmp_path: Path): + """Create fake GIS dataset files in the storage.""" + # Create temporary files locally then push them to storage + local_staging = tmp_path / "staging" + local_staging.mkdir(parents=True, exist_ok=True) + + for dataset, version in _make_fake_gis_datasets(): + # Create local zip file + local_zip = local_staging / f"{dataset}.zip" + local_zip.write_text(f"Mock GIS data for {dataset} version {version}") + + # Push to storage using the connector's interface + storage_key = f"{dataset}/{version}/{dataset}.zip" + conn.storage.push(key=storage_key, filepath=str(local_zip)) + + +@pytest.fixture +def local_gis_conn(tmp_path): + """Create a GIS connector with local storage for testing.""" + # Create the datasets directory first + datasets_dir = tmp_path / "datasets" + datasets_dir.mkdir(parents=True, exist_ok=True) + + # Create connector with local storage instead of S3 + storage = PathedStorageConnector.from_storage_kwargs( + conn_type="edm.publishing.gis.test", + storage_backend=StorageType.LOCAL, + local_dir=datasets_dir, # Note: GIS uses 'datasets' folder + ) + conn = GisDatasetsConnector(storage=storage) + _create_gis_files(conn, tmp_path) + return conn + + +def _test_gis_connector_interface(conn: GisDatasetsConnector, tmp_path): + """Test the basic GIS connector interface methods.""" + expected_datasets_versions = _make_fake_gis_datasets() + # e.g. [('dcp_mandatory_inclusionary_housing', '20241201'), ...] + + expected_unique_datasets = {ds for ds, _ in expected_datasets_versions} + expected_versions_by_dataset: dict[str, list[str]] = {} + for ds, v in expected_datasets_versions: + if ds not in expected_versions_by_dataset: + expected_versions_by_dataset[ds] = [] + expected_versions_by_dataset[ds].append(v) + + # Check that all versions exist + for ds, v in expected_datasets_versions: + assert conn.version_exists(ds, version=v), ( + f"Version {v} should exist for dataset {ds}" + ) + + # Test listing versions + for dataset in expected_unique_datasets: + versions = conn.list_versions(dataset) + expected_versions = sorted(expected_versions_by_dataset[dataset], reverse=True) + assert versions == expected_versions, ( + f"Versions for {dataset} should match expected" + ) + + # Test get_latest_version + for dataset in expected_unique_datasets: + latest = conn.get_latest_version(dataset) + expected_latest = max(expected_versions_by_dataset[dataset]) + assert latest == expected_latest, ( + f"Latest version for {dataset} should be {expected_latest}" + ) + + # Test pull_versioned + for dataset, version in expected_datasets_versions: + destination_dir = tmp_path / "downloads" / dataset / version + destination_dir.mkdir(parents=True, exist_ok=True) + + result = conn.pull_versioned( + key=dataset, version=version, destination_path=destination_dir + ) + + pulled_path = result.get("path") + assert pulled_path is not None, f"Should return a path for {dataset}:{version}" + assert Path(pulled_path).exists(), ( + f"Downloaded file should exist: {pulled_path}" + ) + assert Path(pulled_path).name == f"{dataset}.zip", ( + f"Downloaded file should be named {dataset}.zip" + ) + + +def _test_gis_connector_push_restriction(conn: GisDatasetsConnector, tmp_path): + """Test that pushing is restricted (only GIS team can push).""" + test_file = tmp_path / "test.zip" + test_file.touch() + + with pytest.raises(PermissionError, match="Currently, only GIS team pushes"): + conn.push_versioned(key="test_dataset", version="20241201", filepath=test_file) + + +def _test_gis_connector_error_cases(conn: GisDatasetsConnector, tmp_path): + """Test error cases and edge conditions.""" + # Test non-existent dataset - version_exists also throws due to the assertion + with pytest.raises( + AssertionError, match="No Dataset named nonexistent_dataset found" + ): + conn.version_exists("nonexistent_dataset", "20241201") + + with pytest.raises( + AssertionError, match="No Dataset named nonexistent_dataset found" + ): + conn.list_versions("nonexistent_dataset") + + with pytest.raises( + AssertionError, match="No Dataset named nonexistent_dataset found" + ): + conn.get_latest_version("nonexistent_dataset") + + # Test non-existent version for existing dataset + existing_dataset = "dcp_mandatory_inclusionary_housing" + assert not conn.version_exists( + existing_dataset, "20990101" + ) # Future date that doesn't exist + + +def test_local_gis_conn(local_gis_conn: GisDatasetsConnector, tmp_path): + """Test the GIS connector with local storage.""" + _test_gis_connector_interface(local_gis_conn, tmp_path) + _test_gis_connector_push_restriction(local_gis_conn, tmp_path) + _test_gis_connector_error_cases(local_gis_conn, tmp_path) From 4e8b20eb311d3d7c26273dd55c710d22d5e9333c Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Fri, 16 Jan 2026 13:16:31 -0500 Subject: [PATCH 10/13] Add new connectors to the registry --- dcpy/lifecycle/connector_registry.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/dcpy/lifecycle/connector_registry.py b/dcpy/lifecycle/connector_registry.py index 431c3e7c27..ce2e3404d0 100644 --- a/dcpy/lifecycle/connector_registry.py +++ b/dcpy/lifecycle/connector_registry.py @@ -1,24 +1,24 @@ from dcpy import configuration from dcpy.configuration import ( SFTP_HOST, - SFTP_USER, SFTP_PORT, SFTP_PRIVATE_KEY_PATH, + SFTP_USER, ) -from dcpy.connectors.edm import publishing +from dcpy.connectors import filesystem, ingest_datastore, s3, sftp, web +from dcpy.connectors.edm import builds, drafts, gis, published from dcpy.connectors.edm.bytes import BytesConnector from dcpy.connectors.edm.open_data_nyc import OpenDataConnector +from dcpy.connectors.esri.arcgis_feature_service import ArcGISFeatureServiceConnector from dcpy.connectors.hybrid_pathed_storage import ( - StorageType, PathedStorageConnector, + StorageType, ) -from dcpy.connectors.socrata.connector import SocrataConnector -from dcpy.connectors.esri.arcgis_feature_service import ArcGISFeatureServiceConnector -from dcpy.connectors import filesystem, web, s3, ingest_datastore, sftp from dcpy.connectors.registry import ( - ConnectorRegistry, Connector, + ConnectorRegistry, ) +from dcpy.connectors.socrata.connector import SocrataConnector from dcpy.utils.logging import logger connectors = ConnectorRegistry[Connector]() @@ -60,10 +60,10 @@ def _set_default_connectors(): conns = [ recipes_datasets, recipes_raw, - publishing.DraftsConnector(), - publishing.PublishedConnector(), + drafts.DraftsConnector.create(), + published.PublishedConnector.create(), BytesConnector(), - publishing.GisDatasetsConnector(), + gis.GisDatasetsConnector.create(), SocrataConnector(), OpenDataConnector(), ArcGISFeatureServiceConnector(), @@ -71,7 +71,7 @@ def _set_default_connectors(): [web.WebConnector(), "api"], [filesystem.Connector(), "local_file"], [s3.S3Connector(), "s3"], - [publishing.BuildsConnector(), "edm.publishing.builds"], + [builds.BuildsConnector.create(), "edm.publishing.builds"], [ sftp.SFTPConnector( hostname=SFTP_HOST, From 74bd2a0a194fe0540f7bc97bf3b580a645a3f4ae Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Fri, 16 Jan 2026 13:14:35 -0500 Subject: [PATCH 11/13] Add a CLI in lifecycle --- dcpy/lifecycle/builds/_cli.py | 5 +- dcpy/lifecycle/builds/artifacts/_cli.py | 102 ++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 2 deletions(-) create mode 100644 dcpy/lifecycle/builds/artifacts/_cli.py diff --git a/dcpy/lifecycle/builds/_cli.py b/dcpy/lifecycle/builds/_cli.py index a2c289305d..967413173f 100644 --- a/dcpy/lifecycle/builds/_cli.py +++ b/dcpy/lifecycle/builds/_cli.py @@ -1,11 +1,12 @@ import typer - +from dcpy.lifecycle.builds.artifacts._cli import app as artifacts_app +from dcpy.lifecycle.builds.build import app as build_app from dcpy.lifecycle.builds.load import app as load_app from dcpy.lifecycle.builds.plan import app as plan_app -from dcpy.lifecycle.builds.build import app as build_app app = typer.Typer() app.add_typer(plan_app, name="plan") app.add_typer(build_app, name="build") app.add_typer(load_app, name="load") +app.add_typer(artifacts_app, name="artifacts") diff --git a/dcpy/lifecycle/builds/artifacts/_cli.py b/dcpy/lifecycle/builds/artifacts/_cli.py new file mode 100644 index 0000000000..2b61de45aa --- /dev/null +++ b/dcpy/lifecycle/builds/artifacts/_cli.py @@ -0,0 +1,102 @@ +from pathlib import Path + +import typer + +from dcpy.lifecycle.builds.artifacts import builds, published + +app = typer.Typer() + +# Create two separate apps for the different artifact types +builds_app = typer.Typer() +app.add_typer(builds_app, name="builds") + +drafts_app = typer.Typer() +app.add_typer(drafts_app, name="drafts") + + +@builds_app.command("upload") +def builds_upload( + build_dir: Path = typer.Argument( + Path("output"), help="Path to local output folder" + ), + product: str = typer.Option(..., "-p", "--product", help="Name of data product"), + build_note: str = typer.Option(None, "-b", "--build", help="Build note"), + acl: str = typer.Option( + "public-read", + "-a", + "--acl", + help="Access control level for uploaded files. S3 only", + ), +): + """Upload build artifacts using the builds connector.""" + builds.upload(build_dir, product, build_note, acl) + + +@builds_app.command("promote_to_draft") +def builds_promote_to_draft( + product: str = typer.Option(..., "-p", "--product", help="Data product name"), + build: str = typer.Option(..., "-b", "--build", help="Label of build"), + draft_summary: str = typer.Option( + "", "-ds", "--draft-summary", help="Draft description" + ), + acl: str = typer.Option( + "public-read", + "-a", + "--acl", + help="Access control level for uploaded files. S3 only.", + ), + download_metadata: bool = typer.Option( + False, "-m", "--download-metadata", help="Download metadata after promotion" + ), +): + """Promote build to draft using the builds connector.""" + builds.promote_to_draft( + product=product, + build=build, + acl=acl, + draft_revision_summary=draft_summary, + metadata_file_dir=Path("./") if download_metadata else None, + ) + + +@drafts_app.command("publish") +def drafts_publish( + product: str = typer.Option(..., "-p", "--product", help="Data product name"), + version: str = typer.Option( + ..., "-v", "--version", help="Data product release version" + ), + draft_revision_num: int = typer.Option( + None, + "-dn", + "--draft-number", + help="Draft revision number to publish. If blank, will use latest draft", + ), + acl: str = typer.Option( + "public-read", "-a", "--acl", help="Access control level for uploaded files" + ), + latest: bool = typer.Option( + False, "-l", "--latest", help="Publish to latest folder as well" + ), + is_patch: bool = typer.Option( + False, + "-ip", + "--is-patch", + help="Create a patched version if version already exists", + ), + metadata_file_dir: Path | None = typer.Option( + None, + "-md", + "--metadata-file-dir", + help="Download metadata from publish folder after publishing", + ), +): + """Publish draft to published using the drafts connector.""" + published.publish( + product=product, + version=version, + draft_revision_num=draft_revision_num, + acl=acl, + latest=latest, + is_patch=is_patch, + metadata_file_dir=metadata_file_dir, + ) From 8258bb33019ebfad0ad53e11e008a7b44ffd93bc Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Sat, 17 Jan 2026 17:47:10 -0500 Subject: [PATCH 12/13] Add e2e test --- dcpy/test_integration/conftest.py | 55 +++++ .../lifecycle/builds/conftest.py | 40 ++++ .../lifecycle/builds/test_lifecycle_e2e.py | 211 ++++++++++++++++++ 3 files changed, 306 insertions(+) create mode 100644 dcpy/test_integration/lifecycle/builds/conftest.py create mode 100644 dcpy/test_integration/lifecycle/builds/test_lifecycle_e2e.py diff --git a/dcpy/test_integration/conftest.py b/dcpy/test_integration/conftest.py index 257700a10a..462a67904c 100644 --- a/dcpy/test_integration/conftest.py +++ b/dcpy/test_integration/conftest.py @@ -1,6 +1,8 @@ import os from pathlib import Path +import pytest + RESOURCES_DIR = Path(__file__).parent / "resources" # Connector buckets @@ -17,3 +19,56 @@ os.environ["PGUSER"] = "postgres" os.environ["PGPASSWORD"] = "postgres" + + +# Add incremental pytest. Docs here: https://docs.pytest.org/en/latest/example/simple.html#incremental-testing-test-steps +# store history of failures per test class name and per index in parametrize (if parametrize used) +_test_failed_incremental: dict[str, dict[tuple[int, ...], str]] = {} + + +def pytest_configure(config): + """Register custom marks.""" + config.addinivalue_line( + "markers", + "incremental: mark test to run incrementally, stopping on first failure", + ) + + +def pytest_runtest_makereport(item, call): + if "incremental" in item.keywords: + # incremental marker is used + if call.excinfo is not None: + # the test has failed + # retrieve the class name of the test + cls_name = str(item.cls) + # retrieve the index of the test (if parametrize is used in combination with incremental) + parametrize_index = ( + tuple(item.callspec.indices.values()) + if hasattr(item, "callspec") + else () + ) + # retrieve the name of the test function + test_name = item.originalname or item.name + # store in _test_failed_incremental the original name of the failed test + _test_failed_incremental.setdefault(cls_name, {}).setdefault( + parametrize_index, test_name + ) + + +def pytest_runtest_setup(item): + if "incremental" in item.keywords: + # retrieve the class name of the test + cls_name = str(item.cls) + # check if a previous test has failed for this class + if cls_name in _test_failed_incremental: + # retrieve the index of the test (if parametrize is used in combination with incremental) + parametrize_index = ( + tuple(item.callspec.indices.values()) + if hasattr(item, "callspec") + else () + ) + # retrieve the name of the first test function to fail for this class name and index + test_name = _test_failed_incremental[cls_name].get(parametrize_index, None) + # if name found, test has failed for the combination of class name & test name + if test_name is not None: + pytest.xfail(f"previous test failed ({test_name})") diff --git a/dcpy/test_integration/lifecycle/builds/conftest.py b/dcpy/test_integration/lifecycle/builds/conftest.py new file mode 100644 index 0000000000..88e62b333f --- /dev/null +++ b/dcpy/test_integration/lifecycle/builds/conftest.py @@ -0,0 +1,40 @@ +import pytest + +from dcpy.connectors.edm.builds import BuildsConnector +from dcpy.connectors.edm.drafts import DraftsConnector +from dcpy.connectors.edm.published import PublishedConnector +from dcpy.connectors.hybrid_pathed_storage import PathedStorageConnector, StorageType +from dcpy.lifecycle import connector_registry +from dcpy.lifecycle.connector_registry import connectors as lifecycle_connectors + + +@pytest.fixture(scope="module") +def setup_local_connectors(tmp_path_factory): + """Setup local storage connectors for the test module.""" + # Create a temporary directory for the entire test module + tmp_path = tmp_path_factory.mktemp("lifecycle_test") + + # Clear and setup connectors with local storage + lifecycle_connectors.clear() + + # Create and register local storage connectors + local_storage = PathedStorageConnector.from_storage_kwargs( + conn_type="local_path_conn", + storage_backend=StorageType.LOCAL, + local_dir=tmp_path, + _validate_root_path=False, + ) + + [ + lifecycle_connectors.register(c) + for c in [ + BuildsConnector(storage=local_storage), + DraftsConnector(storage=local_storage), + PublishedConnector(storage=local_storage), + ] + ] + + yield tmp_path + + # Reset connectors after test module completes + connector_registry._set_default_connectors() diff --git a/dcpy/test_integration/lifecycle/builds/test_lifecycle_e2e.py b/dcpy/test_integration/lifecycle/builds/test_lifecycle_e2e.py new file mode 100644 index 0000000000..56a531e91d --- /dev/null +++ b/dcpy/test_integration/lifecycle/builds/test_lifecycle_e2e.py @@ -0,0 +1,211 @@ +import json +from datetime import datetime + +import pytest +import pytz + +from dcpy.lifecycle.builds.artifacts import builds, drafts, published +from dcpy.models.lifecycle.builds import BuildMetadata, Recipe, RecipeInputs + + +@pytest.mark.incremental +class TestLifecycleE2E: + """Test the complete build lifecycle in sequence.""" + + @classmethod + def setup_class(cls): + """Initialize test data shared across all test methods.""" + cls.product = "db-mock-product" + cls.build_name = "mock-build" + cls.version = "24v3" + + def test_build_upload(self, setup_local_connectors, tmp_path): + """Test creating and uploading a build.""" + build_path = tmp_path + + # Create mock build metadata + recipe_inputs = RecipeInputs(datasets=[]) + recipe = Recipe( + name="mock-recipe", + product=self.product, + version=self.version, + inputs=recipe_inputs, + ) + + metadata = BuildMetadata( + version=self.version, + timestamp=datetime.now(pytz.timezone("America/New_York")).isoformat(), + recipe=recipe, + ) + + metadata_file = build_path / "build_metadata.json" + with open(metadata_file, "w") as f: + json.dump(metadata.model_dump(mode="json"), f, indent=4) + + # Create a simple data file + data_file = build_path / "data.txt" + data_file.write_text("mock data content") + + # Upload to builds + build_key = builds.upload( + output_path=build_path, product=self.product, build=self.build_name + ) + + assert build_key.product == self.product + assert build_key.build == self.build_name + + # Query the uploaded build to verify it exists + build_metadata = builds.get_build_metadata( + product=self.product, build=self.build_name + ) + assert build_metadata.version == self.version + assert build_metadata.recipe.product == self.product + + def test_promote_to_drafts(self, setup_local_connectors): + """Test promoting builds to drafts with revisions.""" + # Assert that no drafts exist initially + draft_versions = drafts.get_dataset_versions(self.product) + assert len(draft_versions) == 0, ( + f"No drafts should exist initially. Found {draft_versions}" + ) + + # Promote that build to a draft (no message) + builds.promote_to_draft( + product=self.product, build=self.build_name, draft_revision_summary="" + ) + + # Assert that the draft exists and the revision num is 1 + draft_versions = drafts.get_dataset_versions(self.product) + assert len(draft_versions) == 1, "Should have one draft version" + assert self.version in draft_versions[0], ( + f"Draft version should contain {self.version}" + ) + + revision_list = drafts.get_dataset_version_revisions(self.product, self.version) + assert len(revision_list) == 1, "Should have one revision" + assert "1" in revision_list[0], ( + f"First revision should be numbered 1. Found revisions {revision_list}" + ) + + # Promote again with a message + builds.promote_to_draft( + product=self.product, + build=self.build_name, + draft_revision_summary="fix-the-bug", + ) + + # Assert that the draft revision num is 2 + revision_list = drafts.get_dataset_version_revisions(self.product, self.version) + assert len(revision_list) == 2, "Should have two revisions" + + # Check that we have revision 1 and 2 + revision_numbers = [int(rev.split("-")[0]) for rev in revision_list] + assert 1 in revision_numbers, "Should have revision 1" + assert 2 in revision_numbers, "Should have revision 2" + + # Check that revision 2 has the message + revision_2 = [rev for rev in revision_list if rev.startswith("2-")][0] + assert "fix-the-bug" in revision_2, "Revision 2 should contain the message" + + def test_publish_from_drafts(self, setup_local_connectors, tmp_path): + """Test publishing drafts to published versions.""" + # Publish revision draft 1 + published.publish( + product=self.product, version=self.version, draft_revision_num=1 + ) + + # Assert that the published version exists + published_versions = published.get_versions(self.product) + assert len(published_versions) >= 1, ( + "Should have at least one published version" + ) + assert self.version in published_versions, ( + f"Published version {self.version} should exist" + ) + + # Try: publish revision draft 2, without specifying patch flag, assert error + with pytest.raises(ValueError, match="already exists.*patch wasn't selected"): + published.publish( + product=self.product, + version=self.version, + draft_revision_num=2, + is_patch=False, + ) + + # Publish revision draft 2, specifying that it's a patch + metadata_file_dir = tmp_path / "pub" + published.publish( + product=self.product, + version=self.version, + draft_revision_num=2, + is_patch=True, + latest=True, + metadata_file_dir=tmp_path / metadata_file_dir, + ) + + # Assert that the published patched version exists + published_versions_final = published.get_versions(self.product) + assert len(published_versions_final) >= 2, ( + "Should have at least two published versions" + ) + # Check latest + versions_w_latest = published.get_versions(self.product, exclude_latest=False) + assert "latest" in versions_w_latest, ( + f"Expected a 'latest' version. Instead found {versions_w_latest}" + ) + assert (metadata_file_dir / "build_metadata.json").exists(), ( + "The build md should be downloaded" + ) + + # Should have original version and patched version like 24v3.1 + version_strings = published_versions_final + original_versions = [v for v in version_strings if v == self.version] + patched_versions = [ + v + for v in version_strings + if v.startswith(f"{self.version}.") and v != self.version + ] + + # Check that the actual version (from the build md) was patched correctly + actual_patched_version = published.fetch_version_from_metadata( + self.product, "latest" + ) + assert actual_patched_version == "24v3.0.1" + + assert len(original_versions) >= 1, ( + f"Should have original {self.version} version" + ) + assert len(patched_versions) >= 1, "Should have at least one patched version" + + def test_get_previous_version(self, setup_local_connectors): + """Test getting previous version of published data.""" + # At this point we should have at least 2 published versions: + # - The original version (24v3) + # - The patched version (24v3.1) + + published_versions = published.get_versions(self.product) + assert len(published_versions) >= 2, ( + "Should have at least two published versions for previous version test" + ) + + # Sort versions to get predictable order + sorted_versions = sorted(published_versions) + latest_version = sorted_versions[-1] # Highest version + + # Test getting previous version of the latest + previous = published.get_previous_version(self.product, latest_version) + expected_previous = published.get_versions(self.product)[ + 1 + ] # Second in desc order + assert str(previous.label) == expected_previous, ( + f"Previous version of {latest_version} should be {expected_previous}" + ) + + # Test with the oldest version - should raise error + oldest_version = sorted_versions[0] + with pytest.raises(LookupError, match="is the oldest published version"): + published.get_previous_version(self.product, oldest_version) + + # Test with non-existent product + with pytest.raises(LookupError, match="No published versions found"): + published.get_previous_version("nonexistent-product", "25v1") From 70ed00f33eb91e30da5933de42004ffd70922c1a Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Fri, 6 Feb 2026 16:00:06 -0500 Subject: [PATCH 13/13] post-review: remove slop-comments --- dcpy/connectors/edm/builds.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/dcpy/connectors/edm/builds.py b/dcpy/connectors/edm/builds.py index 707780dba2..ba42cee925 100644 --- a/dcpy/connectors/edm/builds.py +++ b/dcpy/connectors/edm/builds.py @@ -149,10 +149,8 @@ def _pull( ) -> dict: build_key = BuildKey(key, version) - # Construct the source key for the file source_key = f"{build_key.path}/{filepath}" - # Check if the file exists if not self.storage.exists(source_key): raise FileNotFoundError(f"File {source_key} not found") @@ -168,7 +166,6 @@ def _pull( f"Downloading {build_key}, {filepath}, {source_key} -> {output_filepath}" ) - # Use PathedStorageConnector's pull method self.storage.pull(key=source_key, destination_path=output_filepath) return {"path": output_filepath}