diff --git a/config/README.md b/config/README.md index 29ee76e..92eff49 100644 --- a/config/README.md +++ b/config/README.md @@ -36,6 +36,11 @@ default: `50000` Useful for low resource utilization. This will ensure all data is stored in multiple chunks of almost `sample_chunksize` samples. This does not hamper any logic in algorithms but simply ensures that the entire dataset is never loaded all at once on the RAM. `null` value will disregard this optimization. +**num_workers** {int}: `int | null` +default: `1` +This param uses multiple workers in parallel to speed up the data writing to disk. Please use this +with careful consideration of the number of cores available in the device. *Note that this doesn't increase memory usage of pipeline*. Ideal increment found at `num_workers = 3`. + **train_val_test** {dict}: This section splits the data using the mentioned splitting technique mentioned in `splitter_config` & required params like `split_ratio` and `stratify` options. Example below. diff --git a/config/config.yaml b/config/config.yaml index 5725d45..aebd971 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -13,6 +13,7 @@ experiment: # DATA CONFIG. data: sample_chunksize: 20000 + num_workers: 1 train_val_test: full_datapath: '/path/to/anndata.h5ad' @@ -42,6 +43,7 @@ feature_selection: # score_matrix: '/path/to/matrix' feature_subsetsize: 5000 + num_workers: 1 model: name: SequentialModel diff --git a/requirements.txt b/requirements.txt index 9c9f24b..e0620e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ anndata==0.10.9 isort==5.13.2 +loky==3.4.1 memory-profiler==0.61.0 pillow==10.4.0 pre_commit==4.0.1 diff --git a/scalr/data/preprocess/_preprocess.py b/scalr/data/preprocess/_preprocess.py index 68a60a2..57d2280 100644 --- a/scalr/data/preprocess/_preprocess.py +++ b/scalr/data/preprocess/_preprocess.py @@ -50,8 +50,11 @@ def fit( """ pass - def process_data(self, full_data: Union[AnnData, AnnCollection], - sample_chunksize: int, dirpath: str): + def process_data(self, + full_data: Union[AnnData, AnnCollection], + sample_chunksize: int, + dirpath: str, + num_workers: int = 1): """A function to process the entire data chunkwise and write the processed data to disk. @@ -59,6 +62,7 @@ def process_data(self, full_data: Union[AnnData, AnnCollection], full_data (Union[AnnData, AnnCollection]): Full data for transformation. sample_chunksize (int): Number of samples in one chunk. dirpath (str): Path to write the data to. + num_workers (int): number of jobs to run in parallel for data writing. """ if not sample_chunksize: # TODO @@ -68,7 +72,8 @@ def process_data(self, full_data: Union[AnnData, AnnCollection], write_chunkwise_data(full_data, sample_chunksize, dirpath, - transform=self.transform) + transform=self.transform, + num_workers=num_workers) def build_preprocessor( diff --git a/scalr/data/preprocess/sample_norm.py b/scalr/data/preprocess/sample_norm.py index 96fa128..a9bc618 100644 --- a/scalr/data/preprocess/sample_norm.py +++ b/scalr/data/preprocess/sample_norm.py @@ -5,7 +5,6 @@ import numpy as np from scalr.data.preprocess import PreprocessorBase -from scalr.utils import EventLogger class SampleNorm(PreprocessorBase): @@ -20,8 +19,6 @@ def __init__(self, scaling_factor: float = 1.0): self.scaling_factor = scaling_factor - self.event_logger = EventLogger('Sample norm normalization') - def transform(self, data: np.ndarray) -> np.ndarray: """A function to transform provided input data. @@ -31,8 +28,6 @@ def transform(self, data: np.ndarray) -> np.ndarray: Returns: np.ndarray: Processed data. """ - self.event_logger.info('\Transforming data using sample norm.') - data *= (self.scaling_factor / (data.sum(axis=1).reshape(len(data), 1))) return data diff --git a/scalr/data/preprocess/standard_scale.py b/scalr/data/preprocess/standard_scale.py index 1e747ce..b095f0b 100644 --- a/scalr/data/preprocess/standard_scale.py +++ b/scalr/data/preprocess/standard_scale.py @@ -7,7 +7,6 @@ import numpy as np from scalr.data.preprocess import PreprocessorBase -from scalr.utils import EventLogger class StandardScaler(PreprocessorBase): @@ -28,8 +27,6 @@ def __init__(self, with_mean: bool = True, with_std: bool = True): self.train_mean = None self.train_std = None - self.event_logger = EventLogger('Standard Scaler Normalization') - def transform(self, data: np.ndarray) -> np.ndarray: """A function to transform provided input data. @@ -39,9 +36,6 @@ def transform(self, data: np.ndarray) -> np.ndarray: Returns: np.ndarray: processed data """ - self.event_logger.info( - '\Transforming data using standard scaler object') - if not self.with_mean: train_mean = np.zeros((1, data.shape[1])) else: @@ -58,9 +52,6 @@ def fit(self, data: Union[AnnData, AnnCollection], """ - self.event_logger.info('\n\nStarting standardscaler normalization') - self.event_logger.info('\nFitting standard scaler object on train data') - self.calculate_mean(data, sample_chunksize) self.calculate_std(data, sample_chunksize) @@ -76,7 +67,6 @@ def calculate_mean(self, data: Union[AnnData, AnnCollection], Nothing, stores mean per feature of the train data. """ - self.event_logger.info('Calculating mean of data...') train_sum = np.zeros(data.shape[1]).reshape(1, -1) # Iterate through batches of data to get mean statistics @@ -85,11 +75,6 @@ def calculate_mean(self, data: Union[AnnData, AnnCollection], sample_chunksize].X.sum(axis=0) self.train_mean = train_sum / data.shape[0] - if not self.with_mean: - self.event_logger.info( - '`train_mean` will be set to zero during `transform()`, as `with_mean` is set to False!' - ) - def calculate_std(self, data: Union[AnnData, AnnCollection], sample_chunksize: int) -> None: """A function to calculate standard deviation for each feature in the train data. @@ -104,7 +89,6 @@ def calculate_std(self, data: Union[AnnData, AnnCollection], # Getting standard deviation of entire train data per feature. if self.with_std: - self.event_logger.info('Calculating standard deviation of data...') self.train_std = np.zeros(data.shape[1]).reshape(1, -1) # Iterate through batches of data to get std statistics for i in range(int(np.ceil(data.shape[0] / sample_chunksize))): @@ -119,8 +103,6 @@ def calculate_std(self, data: Union[AnnData, AnnCollection], self.train_std[self.train_std == 0] = 1 else: # If `with_std` is False, set train_std to 1. - self.event_logger.info( - 'Setting `train_std` to be 1, as `with_std` is set to False!') self.train_std = np.ones((1, data.shape[1])) @classmethod diff --git a/scalr/data/split/_split.py b/scalr/data/split/_split.py index 962464e..4cd32e9 100644 --- a/scalr/data/split/_split.py +++ b/scalr/data/split/_split.py @@ -92,9 +92,12 @@ def check_splits(self, datapath: str, data_splits: dict, target: str): self.event_logger.info( f'{metadata[target].iloc[test_inds].value_counts()}\n') - def write_splits(self, full_data: Union[AnnData, AnnCollection], - data_split_indices: dict, sample_chunksize: int, - dirpath: int): + def write_splits(self, + full_data: Union[AnnData, AnnCollection], + data_split_indices: dict, + sample_chunksize: int, + dirpath: int, + num_workers: int = None): """THis function writes the train validation and test splits to the disk. Args: @@ -102,17 +105,18 @@ def write_splits(self, full_data: Union[AnnData, AnnCollection], data_split_indices (dict): Indices of each split. sample_chunksize (int): Number of samples to be written in one file. dirpath (int): Path to write data into. - - Returns: - dict: Path of each split. + num_workers (int): number of jobs to run in parallel for data writing. """ for split in data_split_indices.keys(): if sample_chunksize: split_dirpath = path.join(dirpath, split) os.makedirs(split_dirpath, exist_ok=True) - write_chunkwise_data(full_data, sample_chunksize, split_dirpath, - data_split_indices[split]) + write_chunkwise_data(full_data, + sample_chunksize, + split_dirpath, + data_split_indices[split], + num_workers=num_workers) else: filepath = path.join(dirpath, f'{split}.h5ad') write_data(full_data[data_split_indices[split]].to_memory(), diff --git a/scalr/data_ingestion_pipeline.py b/scalr/data_ingestion_pipeline.py index 317e262..7f1d4fe 100644 --- a/scalr/data_ingestion_pipeline.py +++ b/scalr/data_ingestion_pipeline.py @@ -29,6 +29,7 @@ def __init__(self, data_config: dict, dirpath: str = '.'): self.data_config = deepcopy(data_config) self.target = self.data_config.get('target') self.sample_chunksize = self.data_config.get('sample_chunksize') + self.num_workers = self.data_config.get('num_workers', 1) # Make some necessary checks and logs. if not self.target: @@ -99,7 +100,8 @@ def generate_train_val_test_split(self): splitter.write_splits(self.full_data, train_val_test_split_indices, self.sample_chunksize, - train_val_test_split_dirpath) + train_val_test_split_dirpath, + self.num_workers) # Garbage collection del self.full_data @@ -146,7 +148,8 @@ def preprocess_data(self): for split in ['train', 'val', 'test']: split_data = read_data(path.join(datapath, split)) preprocessor.process_data(split_data, self.sample_chunksize, - path.join(processed_datapath, split)) + path.join(processed_datapath, split), + self.num_workers) datapath = processed_datapath diff --git a/scalr/feature/feature_subsetting.py b/scalr/feature/feature_subsetting.py index a3bc495..561d20e 100644 --- a/scalr/feature/feature_subsetting.py +++ b/scalr/feature/feature_subsetting.py @@ -7,11 +7,15 @@ from anndata import AnnData from anndata.experimental import AnnCollection +from joblib import delayed +from joblib import Parallel from torch import nn from scalr.model_training_pipeline import ModelTrainingPipeline from scalr.utils import EventLogger from scalr.utils import FlowLogger +from scalr.utils import read_data +from scalr.utils import write_chunkwise_data class FeatureSubsetting: @@ -30,7 +34,9 @@ def __init__(self, target: str, mappings: dict, dirpath: str = None, - device: str = 'cpu'): + device: str = 'cpu', + num_workers: int = 1, + sample_chunksize: int = None): """Initialize required parameters for feature subset training. Args: @@ -43,9 +49,12 @@ def __init__(self, mappings (dict): mapping of target to labels. dirpath (str, optional): Dirpath to store chunked model weights. Defaults to None. device (str, optional): Device to train models on. Defaults to 'cpu'. + num_workers (int, optional): Number of parallel processes to launch to train multiple + feature subsets simultaneously. Defaults to using single + process. + sample_chunksize (int, optional): Chunks of samples to be loaded in memory at once. + Required when `num_workers` > 1. """ - self.event_logger = EventLogger('FeatureSubsetting') - self.feature_subsetsize = feature_subsetsize self.chunk_model_config = chunk_model_config self.chunk_model_train_config = chunk_model_train_config @@ -55,6 +64,52 @@ def __init__(self, self.mappings = mappings self.dirpath = dirpath self.device = device + self.num_workers = num_workers if num_workers else 1 + self.sample_chunksize = sample_chunksize + + self.total_features = len(self.train_data.var_names) + + # Note that EventLogger does not work with parallel training + # You may use tensorboard logging to track model training logs + if self.num_workers == 1: + self.event_logger = EventLogger('FeatureSubsetting') + + def write_feature_subsetted_data(self): + """Write chunks of feature-subsetted data, to enable parallel training of models + using different chunks of data.""" + if self.num_workers == 1: + return + + self.feature_chunked_data_dirpath = path.join(self.dirpath, + 'chunked_data') + os.makedirs(self.feature_chunked_data_dirpath, exist_ok=True) + + i = 0 + for start in range(0, self.total_features, self.feature_subsetsize): + + feature_subset_inds = list( + range(start, + min(start + self.feature_subsetsize, + self.total_features))) + + write_chunkwise_data(self.train_data, + self.sample_chunksize, + path.join(self.feature_chunked_data_dirpath, + 'train', str(i)), + feature_inds=feature_subset_inds, + num_workers=self.num_workers) + + write_chunkwise_data(self.val_data, + self.sample_chunksize, + path.join(self.feature_chunked_data_dirpath, + 'val', str(i)), + feature_inds=feature_subset_inds, + num_workers=self.num_workers) + + i += 1 + + del self.train_data + del self.val_data def train_chunked_models(self) -> list[nn.Module]: """Trains a model for each subset data. @@ -62,23 +117,30 @@ def train_chunked_models(self) -> list[nn.Module]: Returns: list[nn.Module]: List of models for each subset. """ - self.event_logger.info('Feature subset models training') - models = [] + if self.num_workers == 1: + self.event_logger.info('Feature subset models training') + chunked_models_dirpath = path.join(self.dirpath, 'chunked_models') os.makedirs(chunked_models_dirpath, exist_ok=True) - i = 0 - for start in range(0, len(self.train_data.var_names), - self.feature_subsetsize): - self.event_logger.info(f'\nChunk {i}') + def train_chunked_model(i, start): + if self.num_workers == 1: + self.event_logger.info(f'\nChunk {i}') + chunk_dirpath = path.join(chunked_models_dirpath, str(i)) os.makedirs(chunk_dirpath, exist_ok=True) - i += 1 - train_features_subset = self.train_data[:, start:start + + if self.num_workers > 1: + train_features_subset = read_data( + path.join(self.feature_chunked_data_dirpath, 'train', + str(i))) + val_features_subset = read_data( + path.join(self.feature_chunked_data_dirpath, 'val', str(i))) + else: + train_features_subset = self.train_data[:, start:start + + self.feature_subsetsize] + val_features_subset = self.val_data[:, start:start + self.feature_subsetsize] - val_features_subset = self.val_data[:, start:start + - self.feature_subsetsize] chunk_model_config = deepcopy(self.chunk_model_config) @@ -89,14 +151,24 @@ def train_chunked_models(self) -> list[nn.Module]: model_trainer.set_data_and_targets(train_features_subset, val_features_subset, self.target, self.mappings) + model_trainer.build_model_training_artifacts() best_model = model_trainer.train() self.chunk_model_config, self.chunk_model_train_config = model_trainer.get_updated_config( ) - models.append(best_model) + return i, best_model + + parallel = Parallel(n_jobs=self.num_workers) + models = parallel( + delayed(train_chunked_model)(i, start) for i, (start) in enumerate( + range(0, self.total_features, self.feature_subsetsize))) + # parallel loop returns all models with the chunk number, which is used to sort models in order + # model[1] returns only the model, without the chunk number + models = sorted(models) + models = [model[1] for model in models] return models def get_updated_configs(self): diff --git a/scalr/feature_extraction_pipeline.py b/scalr/feature_extraction_pipeline.py index fa0a118..4920731 100644 --- a/scalr/feature_extraction_pipeline.py +++ b/scalr/feature_extraction_pipeline.py @@ -54,10 +54,14 @@ def load_data_and_targets_from_config(self, data_config: dict): data_config) self.target = data_config.get('target') self.mappings = read_data(data_config['label_mappings']) + self.sample_chunksize = data_config.get('sample_chunksize') - def set_data_and_targets(self, train_data: Union[AnnData, AnnCollection], + def set_data_and_targets(self, + train_data: Union[AnnData, AnnCollection], val_data: Union[AnnData, AnnCollection], - target: Union[str, list[str]], mappings: dict): + target: Union[str, list[str]], + mappings: dict, + sample_chunksize: int = None): """A function to set data when you don't use data directly from config, but rather by other sources like feature subsetting, etc. @@ -67,6 +71,7 @@ def set_data_and_targets(self, train_data: Union[AnnData, AnnCollection], target (Union[str, list[str]]): Target columns name(s). mappings (dict): Mapping of a column value to ids eg. mappings[column_name][label2id] = {A: 1, B:2, ...}. + sample_chunksize (int): Chunks of samples to be loaded in memory at once. """ self.train_data = train_data self.val_data = val_data @@ -80,6 +85,7 @@ def feature_subsetted_model_training(self) -> list[nn.Module]: self.feature_subsetsize = self.feature_selection_config.get( 'feature_subsetsize', len(self.val_data.var_names)) + self.num_workers = self.feature_selection_config.get('num_workers', 1) chunk_model_config = self.feature_selection_config.get('model') chunk_model_train_config = self.feature_selection_config.get( @@ -88,7 +94,11 @@ def feature_subsetted_model_training(self) -> list[nn.Module]: chunked_features_model_trainer = FeatureSubsetting( self.feature_subsetsize, chunk_model_config, chunk_model_train_config, self.train_data, self.val_data, - self.target, self.mappings, self.dirpath, self.device) + self.target, self.mappings, self.dirpath, self.device, + self.num_workers, self.sample_chunksize) + + if self.num_workers > 1: + chunked_features_model_trainer.write_feature_subsetted_data() self.chunked_models = chunked_features_model_trainer.train_chunked_models( ) @@ -118,6 +128,7 @@ def feature_scoring(self) -> pd.DataFrame: if not getattr(self, 'feature_subsetsize', None): self.feature_subsetsize = self.train_data.shape[1] + # TODO: Parallelize feature scoring for i, (model) in enumerate(self.chunked_models): subset_train_data = self.train_data[:, i * self.feature_subsetsize:(i + @@ -176,6 +187,7 @@ def write_top_features_subset_data(self, data_config: dict) -> dict: self.flow_logger.info('Writing feature-subset data onto disk') datapath = data_config['train_val_test'].get('final_datapaths') + feature_subset_datapath = path.join(self.dirpath, 'feature_subset_data') os.makedirs(feature_subset_datapath, exist_ok=True) @@ -186,15 +198,18 @@ def write_top_features_subset_data(self, data_config: dict) -> dict: 'test': test_data } + sample_chunksize = data_config.get('sample_chunksize') + num_workers = data_config.get('num_workers') + for split, split_data in splits.items(): split_feature_subset_datapath = path.join(feature_subset_datapath, split) - sample_chunksize = data_config.get('sample_chunksize') write_chunkwise_data(split_data, sample_chunksize, split_feature_subset_datapath, - feature_inds=self.top_features) + feature_inds=self.top_features, + num_workers=num_workers) data_config['train_val_test'][ 'feature_subset_datapaths'] = feature_subset_datapath diff --git a/scalr/model_training_pipeline.py b/scalr/model_training_pipeline.py index 1964cfa..7c29bf3 100644 --- a/scalr/model_training_pipeline.py +++ b/scalr/model_training_pipeline.py @@ -18,6 +18,7 @@ from scalr.utils import FlowLogger from scalr.utils import load_train_val_data_from_config from scalr.utils import read_data +from scalr.utils import set_seed from scalr.utils import write_data @@ -40,6 +41,7 @@ def __init__(self, device (str, optional): Device to run model on. Defaults to 'cpu'. """ self.flow_logger = FlowLogger('ModelTraining') + set_seed(42) self.train_config = train_config self.model_config = model_config diff --git a/scalr/utils/file_utils.py b/scalr/utils/file_utils.py index 9939c12..b518ad8 100644 --- a/scalr/utils/file_utils.py +++ b/scalr/utils/file_utils.py @@ -1,6 +1,7 @@ """This file contains functions related to file read-write.""" import json +from math import ceil import os from os import path from typing import Union @@ -8,6 +9,8 @@ from anndata import AnnData import anndata as ad from anndata.experimental import AnnCollection +from joblib import delayed +from joblib import Parallel import numpy as np import pandas as pd import yaml @@ -79,7 +82,8 @@ def write_chunkwise_data(full_data: Union[AnnData, AnnCollection], dirpath: str, sample_inds: list[int] = None, feature_inds: list[int] = None, - transform=None): + transform: callable = None, + num_workers: int = 1): """This function writes data subsets iteratively in a chunkwise manner, to ensure only at most `sample_chunksize` samples are loaded at a time. @@ -96,10 +100,15 @@ def write_chunkwise_data(full_data: Union[AnnData, AnnCollection], only a subset of features.dataframe. Defaults to all features. transform (function): a function to apply a transformation on a chunked numpy array. + num_workers (int): Number of jobs to run in parallel for data writing. Additional + workers will not use additional memory, but will be CPU-intensive. """ if not path.exists(dirpath): os.makedirs(dirpath) + if not num_workers: + num_workers = 1 + if not sample_inds: sample_inds = list(range(len(full_data))) @@ -116,18 +125,38 @@ def write_chunkwise_data(full_data: Union[AnnData, AnnCollection], if not isinstance(data, AnnData): data = data.to_adata() - data = data.to_memory() + data = data.to_memory(copy=True) for col in data.obs.columns: data.obs[col] = data.obs[col].astype('category') - # Transformation - if transform: - if not isinstance(data.X, np.ndarray): - data.X = data.X.A - data.X = transform(data.X) - - write_data(data, path.join(dirpath, f'{i}.h5ad')) + def transform_and_write_data(data: AnnData, chunk_number: int): + """Internal function to transform a chunk of data and write + it to disk.""" + + # Handling of empty data + if len(data) == 0: + return + + # Transformation + if transform: + data = AnnData(data.X, obs=data.obs, var=data.var) + if not isinstance(data.X, np.ndarray): + data.X = data.X.A + data.X = transform(data.X) + + write_data(data, path.join(dirpath, f'{chunk_number}.h5ad')) + + worker_chunksize = int( + ceil(sample_chunksize / + num_workers)) if num_workers else sample_chunksize + + # Execute parallel jobs for transformation and witing of data. + parallel = Parallel(n_jobs=num_workers) + parallel( + delayed(transform_and_write_data)( + data=data[j * worker_chunksize:(j + 1) * worker_chunksize], + chunk_number=i * num_workers + j) for j in range(num_workers)) def _get_datapath_from_config(data_config): diff --git a/tutorials/pipeline/config_celltype.yaml b/tutorials/pipeline/config_celltype.yaml index 93cf182..a2ef0af 100644 --- a/tutorials/pipeline/config_celltype.yaml +++ b/tutorials/pipeline/config_celltype.yaml @@ -16,6 +16,7 @@ data: train_val_test: full_datapath: 'data/modified_adata.h5ad' + num_workers: 2 splitter_config: name: GroupSplitter @@ -42,6 +43,7 @@ feature_selection: # score_matrix: '/path/to/matrix' feature_subsetsize: 5000 + num_workers: 2 model: name: SequentialModel diff --git a/tutorials/pipeline/config_clinical.yaml b/tutorials/pipeline/config_clinical.yaml index 9cff995..f4ac2e8 100644 --- a/tutorials/pipeline/config_clinical.yaml +++ b/tutorials/pipeline/config_clinical.yaml @@ -16,6 +16,7 @@ data: train_val_test: full_datapath: 'data/modified_adata.h5ad' + num_workers: 2 splitter_config: name: GroupSplitter @@ -42,6 +43,7 @@ feature_selection: # score_matrix: '/path/to/matrix' feature_subsetsize: 5000 + num_workers: 2 model: name: SequentialModel diff --git a/tutorials/pipeline/scalr_pipeline.ipynb b/tutorials/pipeline/scalr_pipeline.ipynb index 610c8ee..23ed459 100644 --- a/tutorials/pipeline/scalr_pipeline.ipynb +++ b/tutorials/pipeline/scalr_pipeline.ipynb @@ -568,8 +568,10 @@ " - The default `exp_run` number is `0`.If not changed, the celltype classification experiment would be `exp_run_0` with all the pipeline results.\n", "- **Data Config**\n", " - Update the `full_datapath` to `data/modified_adata.h5ad` (as we will include `GeneRecallCurve` in the downstream).\n", + " - Specify the `num_workers` value for effective parallelization.\n", " - Set `target` to `cell_type`.\n", "- **Feature Selection**\n", + " - Specify the `num_workers` value for effective parallelization.\n", " - Update the model layers to `[5000, 10]`, as there are only 10 cell types in the dataset.\n", " - Change `epoch` to `10`.\n", "- **Final Model Training**\n",