From 3d7bdddd8fb545a5047c5db32b40dea8f280bfc2 Mon Sep 17 00:00:00 2001 From: John Walsh Date: Fri, 29 Aug 2025 16:08:24 -0400 Subject: [PATCH 1/6] add initial blobs on s3 implementation --- extract_slim_features.py | 76 ++++++++++++++----- requirements.txt | 1 + storage.py | 154 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 214 insertions(+), 17 deletions(-) create mode 100644 storage.py diff --git a/extract_slim_features.py b/extract_slim_features.py index 213fb19..cd103e9 100644 --- a/extract_slim_features.py +++ b/extract_slim_features.py @@ -12,6 +12,7 @@ import traceback from ifcb_features.all import compute_features +from storage import S3Config, create_blob_storage FEATURE_COLUMNS = [ 'Area', @@ -46,7 +47,7 @@ 'summedConvexPerimeter_over_Perimeter' ] -def extract_and_save_all_features(data_directory, output_directory, bins=None): +def extract_and_save_all_features(data_directory, output_directory, bins=None, storage_mode="local", s3_config=None): """ Extracts slim features from IFCB images in the given directory and saves them to a CSV file. @@ -56,6 +57,8 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None): output_directory (str): Path to the directory where the CSV file will be saved. bins (list, optional): A list of bin names (e.g., 'D20240423T115846_IFCB127') to process. If None, all bins in the data directory are processed. Defaults to None. + storage_mode (str): Storage mode - "local" or "s3". Defaults to "local". + s3_config (S3Config, optional): S3 configuration when using S3 storage. """ try: data_dir = DataDirectory(data_directory) @@ -87,11 +90,25 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None): for sample in data_dir: samples_to_process.append(sample) - for sample in samples_to_process: - all_features = [] - all_blobs = {} - features_output_filename = os.path.join(output_directory, f"{sample.lid}_features_v4.csv") - blobs_output_filename = os.path.join(output_directory, f"{sample.lid}_blobs.zip") + # Validate storage configuration + if storage_mode == "s3" and s3_config is None: + raise ValueError("S3 configuration required for S3 storage mode") + if storage_mode not in ["local", "s3"]: + raise ValueError(f"Invalid storage mode '{storage_mode}'. Use 'local' or 's3'") + + # Create blob storage backend + blob_storage = create_blob_storage( + storage_mode=storage_mode, + output_directory=output_directory, + s3_config=s3_config + ) + + print(f"Using {storage_mode} storage mode") + + try: + for sample in samples_to_process: + all_features = [] + features_output_filename = os.path.join(output_directory, f"{sample.lid}_features_v4.csv") for number, image in sample.images.items(): features = { 'roi_number': number, @@ -100,34 +117,59 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None): blobs_image, roi_features = compute_features(image) features.update(roi_features) + # Store blob using the configured storage backend img_buffer = io.BytesIO() Image.fromarray((blobs_image > 0).astype(np.uint8) * 255).save(img_buffer, format="PNG") - all_blobs[number] = img_buffer.getvalue() + blob_data = img_buffer.getvalue() + + blob_storage.store_blob(sample.lid, number, blob_data) + except Exception as e: print(f"Error processing ROI {number} in sample {sample.pid}: {e}") all_features.append(features) - if all_features: - df = pd.DataFrame.from_records(all_features, columns=['roi_number'] + FEATURE_COLUMNS) - df.to_csv(features_output_filename, index=False, float_format='%.8f') - - if all_blobs: - with zipfile.ZipFile(blobs_output_filename, 'w') as zf: - for roi_number, blob_data in all_blobs.items(): - filename = f"{sample.lid}_{roi_number:05d}.png" - zf.writestr(filename, blob_data) + if all_features: + df = pd.DataFrame.from_records(all_features, columns=['roi_number'] + FEATURE_COLUMNS) + df.to_csv(features_output_filename, index=False, float_format='%.8f') + + # Finalize blob storage for this sample + blob_storage.finalize_sample(sample.lid) + + finally: + # Cleanup storage resources + blob_storage.cleanup() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Extract various ROI features and save blobs as 1-bit PNGs.") parser.add_argument("data_directory", help="Path to the directory containing IFCB data.") parser.add_argument("output_directory", help="Path to the directory to save the output CSV file and blobs.") parser.add_argument("--bins", nargs='+', help="List of bin names to process (space-separated). If not provided, all bins are processed.") + + # S3 storage options + parser.add_argument("--storage-mode", choices=["local", "s3"], default="local", + help="Storage mode for blob images (default: local)") + parser.add_argument("--s3-bucket", help="S3 bucket name (required when storage-mode=s3)") + parser.add_argument("--s3-url", help="S3 endpoint URL (required when storage-mode=s3)") + parser.add_argument("--s3-prefix", default="ifcb-blobs/", + help="S3 key prefix for blob storage (default: ifcb-blobs/)") args = parser.parse_args() + + # Set up S3 configuration if using S3 storage + if args.storage_mode == "s3": + if not args.s3_bucket or not args.s3_url: + parser.error("--s3-bucket and --s3-url are required when using --storage-mode=s3") + s3_config = S3Config( + bucket_name=args.s3_bucket, + s3_url=args.s3_url, + prefix=args.s3_prefix + ) + else: + s3_config = None beginning = time.time() - extract_and_save_all_features(args.data_directory, args.output_directory, args.bins) + extract_and_save_all_features(args.data_directory, args.output_directory, args.bins, args.storage_mode, s3_config) elapsed = time.time() - beginning print(f'Total extract time: {elapsed:.2f} seconds') \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 0d1c87f..1be3313 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ scipy==1.13.1 scikit-image==0.24.0 git+https://github.com/WHOIGit/phasepack@v1.6.1 scikit-learn==1.7.1 +boto3>=1.26.0 diff --git a/storage.py b/storage.py new file mode 100644 index 0000000..f2060e2 --- /dev/null +++ b/storage.py @@ -0,0 +1,154 @@ +from abc import ABC, abstractmethod +import zipfile +import os +import io +import boto3 +import botocore +from typing import Dict, Any +import traceback +from dataclasses import dataclass + + +@dataclass +class S3Config: + """Configuration for S3 blob storage.""" + bucket_name: str + s3_url: str + prefix: str = "ifcb-blobs/" + + +class BlobStorage(ABC): + """Abstract interface for blob storage backends.""" + + @abstractmethod + def store_blob(self, sample_id: str, roi_number: int, blob_data: bytes) -> bool: + """Store a single blob.""" + pass + + @abstractmethod + def finalize_sample(self, sample_id: str) -> bool: + """Finalize storage for a sample (e.g., close ZIP file).""" + pass + + @abstractmethod + def cleanup(self): + """Cleanup resources.""" + pass + + +class LocalZipStorage(BlobStorage): + """Local ZIP file storage backend (original behavior).""" + + def __init__(self, output_directory: str): + self.output_directory = output_directory + self.zip_files: Dict[str, zipfile.ZipFile] = {} + self.blob_counts: Dict[str, int] = {} + + def store_blob(self, sample_id: str, roi_number: int, blob_data: bytes) -> bool: + """Store a blob in the ZIP file for this sample.""" + try: + if sample_id not in self.zip_files: + zip_filename = os.path.join(self.output_directory, f"{sample_id}_blobs.zip") + self.zip_files[sample_id] = zipfile.ZipFile(zip_filename, 'w') + self.blob_counts[sample_id] = 0 + + filename = f"{sample_id}_{roi_number:05d}.png" + self.zip_files[sample_id].writestr(filename, blob_data) + self.blob_counts[sample_id] += 1 + return True + + except Exception as e: + print(f"Error storing blob {roi_number} for sample {sample_id}: {e}") + return False + + def finalize_sample(self, sample_id: str) -> bool: + """Close the ZIP file for this sample.""" + if sample_id in self.zip_files: + try: + self.zip_files[sample_id].close() + print(f"Stored {self.blob_counts[sample_id]} blobs for sample {sample_id} in ZIP") + del self.zip_files[sample_id] + del self.blob_counts[sample_id] + return True + except Exception as e: + print(f"Error finalizing ZIP for sample {sample_id}: {e}") + return False + return True + + def cleanup(self): + """Close any remaining ZIP files.""" + for sample_id in list(self.zip_files.keys()): + self.finalize_sample(sample_id) + + +class S3BlobStorage(BlobStorage): + """S3 storage backend using boto3.""" + + def __init__(self, s3_config: S3Config): + self.config = s3_config + self.s3_client = None + self.blob_counts: Dict[str, int] = {} + self._setup_s3_client() + + def _setup_s3_client(self): + """Initialize S3 client.""" + try: + session = boto3.Session() + self.s3_client = session.client( + 's3', + endpoint_url=self.config.s3_url + ) + print(f"Connected to S3 at {self.config.s3_url}") + except Exception as e: + print(f"Failed to setup S3 client: {e}") + raise + + def store_blob(self, sample_id: str, roi_number: int, blob_data: bytes) -> bool: + """Store a blob in S3.""" + try: + if sample_id not in self.blob_counts: + self.blob_counts[sample_id] = 0 + + key = f"{self.config.prefix}{sample_id}/{roi_number:05d}.png" + + self.s3_client.put_object( + Bucket=self.config.bucket_name, + Key=key, + Body=blob_data, + ContentType='image/png' + ) + + self.blob_counts[sample_id] += 1 + return True + + except Exception as e: + print(f"Error storing blob {roi_number} for sample {sample_id} to S3: {e}") + traceback.print_exc() + return False + + def finalize_sample(self, sample_id: str) -> bool: + """Log completion for this sample.""" + if sample_id in self.blob_counts: + print(f"Stored {self.blob_counts[sample_id]} blobs for sample {sample_id} in S3") + del self.blob_counts[sample_id] + return True + + def cleanup(self): + """Close S3 client.""" + if self.s3_client: + try: + self.s3_client.close() + except: + pass + + +def create_blob_storage(storage_mode: str, output_directory: str, s3_config: S3Config = None) -> BlobStorage: + """Factory function to create appropriate blob storage backend.""" + if storage_mode == "local": + return LocalZipStorage(output_directory) + elif storage_mode == "s3": + if s3_config is None: + raise ValueError("S3 configuration required for S3 storage mode") + return S3BlobStorage(s3_config) + else: + raise ValueError(f"Unknown storage mode: {storage_mode}. Use 'local' or 's3'") From 215c0a477bd3b891c81729f5e2584ab2ef15045f Mon Sep 17 00:00:00 2001 From: John Walsh Date: Mon, 6 Oct 2025 16:46:57 -0400 Subject: [PATCH 2/6] add vastdb upload for features --- extract_slim_features.py | 72 ++++++++++++++++-- pyproject.toml | 7 +- storage.py | 2 +- vastdb_storage.py | 158 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 229 insertions(+), 10 deletions(-) create mode 100644 vastdb_storage.py diff --git a/extract_slim_features.py b/extract_slim_features.py index 32b9a08..f710c4c 100644 --- a/extract_slim_features.py +++ b/extract_slim_features.py @@ -13,8 +13,11 @@ from ifcb_features.all import compute_features from storage import S3Config, create_blob_storage +from vastdb_storage import VastDBFeatureStorage, VastDBConfig FEATURE_COLUMNS = [ + 'sample_id', + 'roi_number', 'Area', 'Biovolume', 'BoundingBox_xwidth', @@ -47,18 +50,19 @@ 'summedConvexPerimeter_over_Perimeter' ] -def extract_and_save_all_features(data_directory, output_directory, bins=None, storage_mode="local", s3_config=None): +def extract_and_save_all_features(data_directory, output_directory, bins=None, storage_mode="local", s3_config=None, vastdb_config=None): """ Extracts slim features from IFCB images in the given directory - and saves them to a CSV file. + and saves them to a CSV file and/or VastDB. Args: data_directory (str): Path to the directory containing IFCB data. output_directory (str): Path to the directory where the CSV file will be saved. bins (list, optional): A list of bin names (e.g., 'D20240423T115846_IFCB127') to process. If None, all bins in the data directory are processed. Defaults to None. - storage_mode (str): Storage mode - "local" or "s3". Defaults to "local". + storage_mode (str): Storage mode for blobs - "local" or "s3". Defaults to "local". s3_config (S3Config, optional): S3 configuration when using S3 storage. + vastdb_config (VastDBConfig, optional): VastDB configuration for storing features. """ try: data_dir = DataDirectory(data_directory) @@ -105,12 +109,19 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None, s print(f"Using {storage_mode} storage mode") + # Initialize VastDB storage if configured + vastdb_storage = None + if vastdb_config: + vastdb_storage = VastDBFeatureStorage(vastdb_config) + print(f"VastDB feature storage enabled: {vastdb_config.schema_name}.{vastdb_config.table_name}") + try: for sample in samples_to_process: all_features = [] features_output_filename = os.path.join(output_directory, f"{sample.lid}_features_v4.csv") for number, image in sample.images.items(): features = { + 'sample_id': sample.lid, 'roi_number': number, } try: @@ -130,15 +141,23 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None, s all_features.append(features) if all_features: - df = pd.DataFrame.from_records(all_features, columns=['roi_number'] + FEATURE_COLUMNS) + df = pd.DataFrame.from_records(all_features, columns=FEATURE_COLUMNS) + + # Save to CSV (always) df.to_csv(features_output_filename, index=False) + # Insert into VastDB if configured + if vastdb_storage: + vastdb_storage.insert_features(df) + # Finalize blob storage for this sample blob_storage.finalize_sample(sample.lid) finally: # Cleanup storage resources blob_storage.cleanup() + if vastdb_storage: + vastdb_storage.cleanup() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Extract various ROI features and save blobs as 1-bit PNGs.") @@ -147,12 +166,22 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None, s parser.add_argument("--bins", nargs='+', help="List of bin names to process (space-separated). If not provided, all bins are processed.") # S3 storage options - parser.add_argument("--storage-mode", choices=["local", "s3"], default="local", + parser.add_argument("--storage-mode", choices=["local", "s3"], default="local", help="Storage mode for blob images (default: local)") parser.add_argument("--s3-bucket", help="S3 bucket name (required when storage-mode=s3)") parser.add_argument("--s3-url", help="S3 endpoint URL (required when storage-mode=s3)") - parser.add_argument("--s3-prefix", default="ifcb-blobs/", - help="S3 key prefix for blob storage (default: ifcb-blobs/)") + parser.add_argument("--s3-prefix", default="ifcb-blobs-slim-features/", + help="S3 key prefix for blob storage (default: ifcb-blobs-slim-features/)") + + # VastDB feature storage options + parser.add_argument("--vastdb-enable", action="store_true", + help="Enable VastDB feature storage") + parser.add_argument("--vastdb-bucket", help="VastDB bucket name (required when vastdb-enable)") + parser.add_argument("--vastdb-schema", help="VastDB schema name (required when vastdb-enable)") + parser.add_argument("--vastdb-table", help="VastDB table name (required when vastdb-enable)") + parser.add_argument("--vastdb-url", help="VastDB endpoint URL (defaults to s3-url if not provided)") + parser.add_argument("--vastdb-access-key", help="VastDB access key (uses AWS_ACCESS_KEY_ID env var if not provided)") + parser.add_argument("--vastdb-secret-key", help="VastDB secret key (uses AWS_SECRET_ACCESS_KEY env var if not provided)") args = parser.parse_args() @@ -168,8 +197,35 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None, s else: s3_config = None + # Set up VastDB configuration if enabled + vastdb_config = None + if args.vastdb_enable: + if not args.vastdb_bucket or not args.vastdb_schema or not args.vastdb_table: + parser.error("--vastdb-bucket, --vastdb-schema, and --vastdb-table are required when using --vastdb-enable") + + # Use provided endpoint or fall back to S3 URL + vastdb_url = args.vastdb_url or args.s3_url + if not vastdb_url: + parser.error("--vastdb-url or --s3-url must be provided when using --vastdb-enable") + + # Get credentials from args or environment + access_key = args.vastdb_access_key or os.environ.get('AWS_ACCESS_KEY_ID') + secret_key = args.vastdb_secret_key or os.environ.get('AWS_SECRET_ACCESS_KEY') + + if not access_key or not secret_key: + parser.error("VastDB credentials required: provide --vastdb-access-key/--vastdb-secret-key or set AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY environment variables") + + vastdb_config = VastDBConfig( + bucket_name=args.vastdb_bucket, + schema_name=args.vastdb_schema, + table_name=args.vastdb_table, + endpoint_url=vastdb_url, + access_key=access_key, + secret_key=secret_key + ) + beginning = time.time() - extract_and_save_all_features(args.data_directory, args.output_directory, args.bins, args.storage_mode, s3_config) + extract_and_save_all_features(args.data_directory, args.output_directory, args.bins, args.storage_mode, s3_config, vastdb_config) elapsed = time.time() - beginning print(f'Total extract time: {elapsed:.2f} seconds') \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 9fe2903..f328d78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,11 +26,16 @@ classifiers = [ ] dependencies = [ "numpy", + "pandas", + "pillow", "scipy", "scikit-image", "phasepack @ git+https://github.com/WHOIGit/phasepack@v1.6.1", "scikit-learn", - "pyifcb @ git+https://github.com/joefutrelle/pyifcb@v1.2.1" + "pyifcb @ git+https://github.com/joefutrelle/pyifcb@v1.2.1", + "boto3", + "pyarrow>=18.0", + "vastdb" ] [project.urls] diff --git a/storage.py b/storage.py index d4e7da6..efab6ac 100644 --- a/storage.py +++ b/storage.py @@ -14,7 +14,7 @@ class S3Config: """Configuration for S3 blob storage.""" bucket_name: str s3_url: str - prefix: str = "ifcb-blobs/" + prefix: str = "ifcb-blobs-slim-features/" class BlobStorage(ABC): diff --git a/vastdb_storage.py b/vastdb_storage.py new file mode 100644 index 0000000..0d44267 --- /dev/null +++ b/vastdb_storage.py @@ -0,0 +1,158 @@ +"""VastDB storage for IFCB features.""" +from dataclasses import dataclass +from typing import Optional +import pandas as pd +import pyarrow as pa +import vastdb +from storage import S3Config + + +@dataclass +class VastDBConfig: + """Configuration for VastDB feature storage.""" + bucket_name: str + schema_name: str + table_name: str + endpoint_url: str + access_key: str + secret_key: str + + +class VastDBFeatureStorage: + """VastDB storage backend for features.""" + + def __init__(self, config: VastDBConfig): + self.config = config + self.session = None + self._setup_session() + + def _setup_session(self): + """Initialize VastDB session.""" + try: + self.session = vastdb.connect( + endpoint=self.config.endpoint_url, + access=self.config.access_key, + secret=self.config.secret_key + ) + print(f"Connected to VastDB at {self.config.endpoint_url}") + except Exception as e: + print(f"Failed to connect to VastDB: {e}") + raise + + def _get_features_schema(self) -> pa.Schema: + """Define PyArrow schema for features table.""" + # Composite key: sample_id (string) + roi_number (int64) + # All feature columns are float64 + return pa.schema([ + ('sample_id', pa.string()), + ('roi_number', pa.int64()), + ('Area', pa.float64()), + ('Biovolume', pa.float64()), + ('BoundingBox_xwidth', pa.float64()), + ('BoundingBox_ywidth', pa.float64()), + ('ConvexArea', pa.float64()), + ('ConvexPerimeter', pa.float64()), + ('Eccentricity', pa.float64()), + ('EquivDiameter', pa.float64()), + ('Extent', pa.float64()), + ('MajorAxisLength', pa.float64()), + ('MinorAxisLength', pa.float64()), + ('Orientation', pa.float64()), + ('Perimeter', pa.float64()), + ('RepresentativeWidth', pa.float64()), + ('Solidity', pa.float64()), + ('SurfaceArea', pa.float64()), + ('maxFeretDiameter', pa.float64()), + ('minFeretDiameter', pa.float64()), + ('numBlobs', pa.float64()), + ('summedArea', pa.float64()), + ('summedBiovolume', pa.float64()), + ('summedConvexArea', pa.float64()), + ('summedConvexPerimeter', pa.float64()), + ('summedMajorAxisLength', pa.float64()), + ('summedMinorAxisLength', pa.float64()), + ('summedPerimeter', pa.float64()), + ('summedSurfaceArea', pa.float64()), + ('Area_over_PerimeterSquared', pa.float64()), + ('Area_over_Perimeter', pa.float64()), + ('summedConvexPerimeter_over_Perimeter', pa.float64()), + ]) + + def _ensure_table_exists(self, tx): + """Ensure schema and table exist, create if they don't.""" + try: + # Get or create bucket + bucket = tx.bucket(self.config.bucket_name) + + # Try to get existing schema, create if doesn't exist + try: + schema = bucket.schema(self.config.schema_name) + print(f"Using existing schema: {self.config.schema_name}") + except Exception: + schema = bucket.create_schema(self.config.schema_name) + print(f"Created new schema: {self.config.schema_name}") + + # Try to get existing table, create if doesn't exist + try: + table = schema.table(self.config.table_name) + print(f"Using existing table: {self.config.table_name}") + except Exception: + columns = self._get_features_schema() + table = schema.create_table(self.config.table_name, columns) + print(f"Created new table: {self.config.table_name}") + + return table + + except Exception as e: + print(f"Error ensuring table exists: {e}") + raise + + def insert_features(self, features_df: pd.DataFrame) -> bool: + """Insert features DataFrame into VastDB table.""" + try: + with self.session.transaction() as tx: + table = self._ensure_table_exists(tx) + + # Convert pandas DataFrame to PyArrow Table + arrow_table = pa.Table.from_pandas(features_df, schema=self._get_features_schema()) + + # Insert data + table.insert(arrow_table) + + print(f"Inserted {len(features_df)} rows into {self.config.table_name}") + return True + + except Exception as e: + print(f"Error inserting features into VastDB: {e}") + import traceback + traceback.print_exc() + return False + + def cleanup(self): + """Close VastDB session.""" + if self.session: + try: + # VastDB session cleanup if needed + pass + except Exception as e: + print(f"Error during cleanup: {e}") + + +def create_vastdb_storage_from_s3_config( + s3_config: S3Config, + bucket_name: str, + schema_name: str, + table_name: str, + access_key: str, + secret_key: str +) -> VastDBFeatureStorage: + """Create VastDB storage using S3 config endpoint.""" + vastdb_config = VastDBConfig( + bucket_name=bucket_name, + schema_name=schema_name, + table_name=table_name, + endpoint_url=s3_config.s3_url, + access_key=access_key, + secret_key=secret_key + ) + return VastDBFeatureStorage(vastdb_config) From 510c10e5d943dc0f6b9953dca9fc4678855cedba Mon Sep 17 00:00:00 2001 From: John Walsh Date: Mon, 6 Oct 2025 16:51:34 -0400 Subject: [PATCH 3/6] remove unused requirements.txt --- requirements.txt | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 1be3313..0000000 --- a/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -pandas==2.2.3 -scipy==1.13.1 -scikit-image==0.24.0 -git+https://github.com/WHOIGit/phasepack@v1.6.1 -scikit-learn==1.7.1 -boto3>=1.26.0 From 0dae6acf566f1aaef9291bdd18640b96fd10efcf Mon Sep 17 00:00:00 2001 From: John Walsh Date: Mon, 6 Oct 2025 16:57:01 -0400 Subject: [PATCH 4/6] update storage file naming for clarity --- storage.py => blob_storage.py | 4 +++- extract_slim_features.py | 4 ++-- vastdb_storage.py => feature_storage.py | 0 3 files changed, 5 insertions(+), 3 deletions(-) rename storage.py => blob_storage.py (97%) rename vastdb_storage.py => feature_storage.py (100%) diff --git a/storage.py b/blob_storage.py similarity index 97% rename from storage.py rename to blob_storage.py index efab6ac..557d4f7 100644 --- a/storage.py +++ b/blob_storage.py @@ -1,3 +1,5 @@ +"""S3 and local storage backends for IFCB blob images.""" + from abc import ABC, abstractmethod import zipfile import os @@ -18,7 +20,7 @@ class S3Config: class BlobStorage(ABC): - """Abstract interface for blob storage backends.""" + """Abstract interface for blob storage backends (S3 or local ZIP files).""" @abstractmethod def store_blob(self, sample_id: str, roi_number: int, blob_data: bytes) -> bool: diff --git a/extract_slim_features.py b/extract_slim_features.py index f710c4c..5c0e3f2 100644 --- a/extract_slim_features.py +++ b/extract_slim_features.py @@ -12,8 +12,8 @@ import traceback from ifcb_features.all import compute_features -from storage import S3Config, create_blob_storage -from vastdb_storage import VastDBFeatureStorage, VastDBConfig +from blob_storage import S3Config, create_blob_storage +from feature_storage import VastDBFeatureStorage, VastDBConfig FEATURE_COLUMNS = [ 'sample_id', diff --git a/vastdb_storage.py b/feature_storage.py similarity index 100% rename from vastdb_storage.py rename to feature_storage.py From 585afe8e27597abd543f20382f3a8ee15604488a Mon Sep 17 00:00:00 2001 From: John Walsh Date: Mon, 6 Oct 2025 17:34:09 -0400 Subject: [PATCH 5/6] update command arguments for clarity --- extract_slim_features.py | 86 ++++++++++++++++++++++------------------ feature_storage.py | 2 +- 2 files changed, 48 insertions(+), 40 deletions(-) diff --git a/extract_slim_features.py b/extract_slim_features.py index 5c0e3f2..c2fe605 100644 --- a/extract_slim_features.py +++ b/extract_slim_features.py @@ -50,19 +50,20 @@ 'summedConvexPerimeter_over_Perimeter' ] -def extract_and_save_all_features(data_directory, output_directory, bins=None, storage_mode="local", s3_config=None, vastdb_config=None): +def extract_and_save_all_features(data_directory, output_directory, bins=None, blob_storage_mode="local", s3_config=None, feature_storage_mode="local", vastdb_config=None): """ Extracts slim features from IFCB images in the given directory - and saves them to a CSV file and/or VastDB. + and saves them to CSV or VastDB. Args: data_directory (str): Path to the directory containing IFCB data. - output_directory (str): Path to the directory where the CSV file will be saved. + output_directory (str): Path to the directory where the CSV file will be saved (if feature_storage_mode=local). bins (list, optional): A list of bin names (e.g., 'D20240423T115846_IFCB127') to process. If None, all bins in the data directory are processed. Defaults to None. - storage_mode (str): Storage mode for blobs - "local" or "s3". Defaults to "local". - s3_config (S3Config, optional): S3 configuration when using S3 storage. - vastdb_config (VastDBConfig, optional): VastDB configuration for storing features. + blob_storage_mode (str): Storage mode for blobs - "local" or "s3". Defaults to "local". + s3_config (S3Config, optional): S3 configuration when using S3 blob storage. + feature_storage_mode (str): Storage mode for features - "local" or "vastdb". Defaults to "local". + vastdb_config (VastDBConfig, optional): VastDB configuration when using VastDB feature storage. """ try: data_dir = DataDirectory(data_directory) @@ -94,26 +95,34 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None, s for sample in data_dir: samples_to_process.append(sample) - # Validate storage configuration - if storage_mode == "s3" and s3_config is None: - raise ValueError("S3 configuration required for S3 storage mode") - if storage_mode not in ["local", "s3"]: - raise ValueError(f"Invalid storage mode '{storage_mode}'. Use 'local' or 's3'") + # Validate blob storage configuration + if blob_storage_mode == "s3" and s3_config is None: + raise ValueError("S3 configuration required for S3 blob storage mode") + if blob_storage_mode not in ["local", "s3"]: + raise ValueError(f"Invalid blob storage mode '{blob_storage_mode}'. Use 'local' or 's3'") + + # Validate feature storage configuration + if feature_storage_mode == "vastdb" and vastdb_config is None: + raise ValueError("VastDB configuration required for VastDB feature storage mode") + if feature_storage_mode not in ["local", "vastdb"]: + raise ValueError(f"Invalid feature storage mode '{feature_storage_mode}'. Use 'local' or 'vastdb'") # Create blob storage backend blob_storage = create_blob_storage( - storage_mode=storage_mode, + storage_mode=blob_storage_mode, output_directory=output_directory, s3_config=s3_config ) - print(f"Using {storage_mode} storage mode") + print(f"Blob storage: {blob_storage_mode}") - # Initialize VastDB storage if configured + # Initialize feature storage vastdb_storage = None - if vastdb_config: + if feature_storage_mode == "vastdb": vastdb_storage = VastDBFeatureStorage(vastdb_config) - print(f"VastDB feature storage enabled: {vastdb_config.schema_name}.{vastdb_config.table_name}") + print(f"Feature storage: vastdb ({vastdb_config.schema_name}.{vastdb_config.table_name})") + else: + print(f"Feature storage: local CSV") try: for sample in samples_to_process: @@ -143,11 +152,10 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None, s if all_features: df = pd.DataFrame.from_records(all_features, columns=FEATURE_COLUMNS) - # Save to CSV (always) - df.to_csv(features_output_filename, index=False) - - # Insert into VastDB if configured - if vastdb_storage: + # Save features based on storage mode + if feature_storage_mode == "local": + df.to_csv(features_output_filename, index=False) + elif feature_storage_mode == "vastdb": vastdb_storage.insert_features(df) # Finalize blob storage for this sample @@ -165,30 +173,30 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None, s parser.add_argument("output_directory", help="Path to the directory to save the output CSV file and blobs.") parser.add_argument("--bins", nargs='+', help="List of bin names to process (space-separated). If not provided, all bins are processed.") - # S3 storage options - parser.add_argument("--storage-mode", choices=["local", "s3"], default="local", + # Blob storage options + parser.add_argument("--blob-storage-mode", choices=["local", "s3"], default="local", help="Storage mode for blob images (default: local)") - parser.add_argument("--s3-bucket", help="S3 bucket name (required when storage-mode=s3)") - parser.add_argument("--s3-url", help="S3 endpoint URL (required when storage-mode=s3)") + parser.add_argument("--s3-bucket", help="S3 bucket name (required when blob-storage-mode=s3)") + parser.add_argument("--s3-url", help="S3 endpoint URL (required when blob-storage-mode=s3)") parser.add_argument("--s3-prefix", default="ifcb-blobs-slim-features/", help="S3 key prefix for blob storage (default: ifcb-blobs-slim-features/)") - # VastDB feature storage options - parser.add_argument("--vastdb-enable", action="store_true", - help="Enable VastDB feature storage") - parser.add_argument("--vastdb-bucket", help="VastDB bucket name (required when vastdb-enable)") - parser.add_argument("--vastdb-schema", help="VastDB schema name (required when vastdb-enable)") - parser.add_argument("--vastdb-table", help="VastDB table name (required when vastdb-enable)") + # Feature storage options + parser.add_argument("--feature-storage-mode", choices=["local", "vastdb"], default="local", + help="Storage mode for features (default: local)") + parser.add_argument("--vastdb-bucket", help="VastDB bucket name (required when feature-storage-mode=vastdb)") + parser.add_argument("--vastdb-schema", help="VastDB schema name (required when feature-storage-mode=vastdb)") + parser.add_argument("--vastdb-table", help="VastDB table name (required when feature-storage-mode=vastdb)") parser.add_argument("--vastdb-url", help="VastDB endpoint URL (defaults to s3-url if not provided)") parser.add_argument("--vastdb-access-key", help="VastDB access key (uses AWS_ACCESS_KEY_ID env var if not provided)") parser.add_argument("--vastdb-secret-key", help="VastDB secret key (uses AWS_SECRET_ACCESS_KEY env var if not provided)") args = parser.parse_args() - # Set up S3 configuration if using S3 storage - if args.storage_mode == "s3": + # Set up S3 configuration if using S3 blob storage + if args.blob_storage_mode == "s3": if not args.s3_bucket or not args.s3_url: - parser.error("--s3-bucket and --s3-url are required when using --storage-mode=s3") + parser.error("--s3-bucket and --s3-url are required when using --blob-storage-mode=s3") s3_config = S3Config( bucket_name=args.s3_bucket, s3_url=args.s3_url, @@ -197,16 +205,16 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None, s else: s3_config = None - # Set up VastDB configuration if enabled + # Set up VastDB configuration if using VastDB feature storage vastdb_config = None - if args.vastdb_enable: + if args.feature_storage_mode == "vastdb": if not args.vastdb_bucket or not args.vastdb_schema or not args.vastdb_table: - parser.error("--vastdb-bucket, --vastdb-schema, and --vastdb-table are required when using --vastdb-enable") + parser.error("--vastdb-bucket, --vastdb-schema, and --vastdb-table are required when using --feature-storage-mode=vastdb") # Use provided endpoint or fall back to S3 URL vastdb_url = args.vastdb_url or args.s3_url if not vastdb_url: - parser.error("--vastdb-url or --s3-url must be provided when using --vastdb-enable") + parser.error("--vastdb-url or --s3-url must be provided when using --feature-storage-mode=vastdb") # Get credentials from args or environment access_key = args.vastdb_access_key or os.environ.get('AWS_ACCESS_KEY_ID') @@ -225,7 +233,7 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None, s ) beginning = time.time() - extract_and_save_all_features(args.data_directory, args.output_directory, args.bins, args.storage_mode, s3_config, vastdb_config) + extract_and_save_all_features(args.data_directory, args.output_directory, args.bins, args.blob_storage_mode, s3_config, args.feature_storage_mode, vastdb_config) elapsed = time.time() - beginning print(f'Total extract time: {elapsed:.2f} seconds') \ No newline at end of file diff --git a/feature_storage.py b/feature_storage.py index 0d44267..72c3f15 100644 --- a/feature_storage.py +++ b/feature_storage.py @@ -4,7 +4,7 @@ import pandas as pd import pyarrow as pa import vastdb -from storage import S3Config +from blob_storage import S3Config @dataclass From 6c50bb6c6a7a8c09c730a6a1865441c6ba96a541 Mon Sep 17 00:00:00 2001 From: John Walsh Date: Thu, 16 Oct 2025 10:36:06 -0400 Subject: [PATCH 6/6] fix: process all bins in directories with multiple bins --- extract_slim_features.py | 62 +++++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/extract_slim_features.py b/extract_slim_features.py index c2fe605..15576ea 100644 --- a/extract_slim_features.py +++ b/extract_slim_features.py @@ -128,38 +128,46 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None, b for sample in samples_to_process: all_features = [] features_output_filename = os.path.join(output_directory, f"{sample.lid}_features_v4.csv") - for number, image in sample.images.items(): - features = { - 'sample_id': sample.lid, - 'roi_number': number, - } + try: - blobs_image, roi_features = compute_features(image) - features.update(roi_features) - - # Store blob using the configured storage backend - img_buffer = io.BytesIO() - Image.fromarray((blobs_image > 0).astype(np.uint8) * 255).save(img_buffer, format="PNG") - blob_data = img_buffer.getvalue() - - blob_storage.store_blob(sample.lid, number, blob_data) - - except Exception as e: - print(f"Error processing ROI {number} in sample {sample.pid}: {e}") + with sample: # Open ROI file + for number, image in sample.images.items(): + features = { + 'sample_id': sample.lid, + 'roi_number': number, + } + try: + blobs_image, roi_features = compute_features(image) + features.update(roi_features) + + # Store blob using the configured storage backend + img_buffer = io.BytesIO() + Image.fromarray((blobs_image > 0).astype(np.uint8) * 255).save(img_buffer, format="PNG") + blob_data = img_buffer.getvalue() + + blob_storage.store_blob(sample.lid, number, blob_data) - all_features.append(features) + except Exception as e: + print(f"Error processing ROI {number} in sample {sample.pid}: {e}") - if all_features: - df = pd.DataFrame.from_records(all_features, columns=FEATURE_COLUMNS) + all_features.append(features) - # Save features based on storage mode - if feature_storage_mode == "local": - df.to_csv(features_output_filename, index=False) - elif feature_storage_mode == "vastdb": - vastdb_storage.insert_features(df) + if all_features: + df = pd.DataFrame.from_records(all_features, columns=FEATURE_COLUMNS) - # Finalize blob storage for this sample - blob_storage.finalize_sample(sample.lid) + # Save features based on storage mode + if feature_storage_mode == "local": + df.to_csv(features_output_filename, index=False) + elif feature_storage_mode == "vastdb": + vastdb_storage.insert_features(df) + + # Finalize blob storage for this sample + blob_storage.finalize_sample(sample.lid) + + except Exception as e: + print(f"Error processing sample {sample.pid}: {e}") + traceback.print_exc() + continue finally: # Cleanup storage resources