From 91037f6e14265e190a6a981223ce4832f679f02c Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Thu, 24 Oct 2024 16:49:10 +0530 Subject: [PATCH 01/14] parallel chunkwise writing using num_workers param from config -- not working due to event logging in pre-processing --- scalr/data/preprocess/_preprocess.py | 11 ++++++--- scalr/data/split/_split.py | 20 +++++++++------- scalr/data_ingestion_pipeline.py | 7 ++++-- scalr/feature_extraction_pipeline.py | 7 ++++-- scalr/utils/file_utils.py | 35 +++++++++++++++++++++------- 5 files changed, 57 insertions(+), 23 deletions(-) diff --git a/scalr/data/preprocess/_preprocess.py b/scalr/data/preprocess/_preprocess.py index 68a60a2..989bd6b 100644 --- a/scalr/data/preprocess/_preprocess.py +++ b/scalr/data/preprocess/_preprocess.py @@ -50,8 +50,11 @@ def fit( """ pass - def process_data(self, full_data: Union[AnnData, AnnCollection], - sample_chunksize: int, dirpath: str): + def process_data(self, + full_data: Union[AnnData, AnnCollection], + sample_chunksize: int, + dirpath: str, + num_workers: int = None): """A function to process the entire data chunkwise and write the processed data to disk. @@ -59,6 +62,7 @@ def process_data(self, full_data: Union[AnnData, AnnCollection], full_data (Union[AnnData, AnnCollection]): Full data for transformation. sample_chunksize (int): Number of samples in one chunk. dirpath (str): Path to write the data to. + num_workers (int): number of jobs to run in parallel for data writing. """ if not sample_chunksize: # TODO @@ -68,7 +72,8 @@ def process_data(self, full_data: Union[AnnData, AnnCollection], write_chunkwise_data(full_data, sample_chunksize, dirpath, - transform=self.transform) + transform=self.transform, + num_workers=num_workers) def build_preprocessor( diff --git a/scalr/data/split/_split.py b/scalr/data/split/_split.py index f2ffe14..c502d0e 100644 --- a/scalr/data/split/_split.py +++ b/scalr/data/split/_split.py @@ -92,9 +92,12 @@ def check_splits(self, datapath: str, data_splits: dict, target: str): self.event_logger.info( f'{metadata[target].iloc[test_inds].value_counts()}\n') - def write_splits(self, full_data: Union[AnnData, AnnCollection], - data_split_indices: dict, sample_chunksize: int, - dirpath: int): + def write_splits(self, + full_data: Union[AnnData, AnnCollection], + data_split_indices: dict, + sample_chunksize: int, + dirpath: int, + num_workers: int = None): """THis function writes the train validation and test splits to the disk. Args: @@ -102,17 +105,18 @@ def write_splits(self, full_data: Union[AnnData, AnnCollection], data_split_indices (dict): Indices of each split. sample_chunksize (int): Number of samples to be written in one file. dirpath (int): Path to write data into. - - Returns: - dict: Path of each split. + num_workers (int): number of jobs to run in parallel for data writing. """ for split in data_split_indices.keys(): if sample_chunksize: split_dirpath = path.join(dirpath, split) os.makedirs(split_dirpath, exist_ok=True) - write_chunkwise_data(full_data, sample_chunksize, split_dirpath, - data_split_indices[split]) + write_chunkwise_data(full_data, + sample_chunksize, + split_dirpath, + data_split_indices[split], + num_workers=num_workers) else: filepath = path.join(dirpath, f'{split}.h5ad') write_data(full_data[data_split_indices[split]], filepath) diff --git a/scalr/data_ingestion_pipeline.py b/scalr/data_ingestion_pipeline.py index 317e262..7f1d4fe 100644 --- a/scalr/data_ingestion_pipeline.py +++ b/scalr/data_ingestion_pipeline.py @@ -29,6 +29,7 @@ def __init__(self, data_config: dict, dirpath: str = '.'): self.data_config = deepcopy(data_config) self.target = self.data_config.get('target') self.sample_chunksize = self.data_config.get('sample_chunksize') + self.num_workers = self.data_config.get('num_workers', 1) # Make some necessary checks and logs. if not self.target: @@ -99,7 +100,8 @@ def generate_train_val_test_split(self): splitter.write_splits(self.full_data, train_val_test_split_indices, self.sample_chunksize, - train_val_test_split_dirpath) + train_val_test_split_dirpath, + self.num_workers) # Garbage collection del self.full_data @@ -146,7 +148,8 @@ def preprocess_data(self): for split in ['train', 'val', 'test']: split_data = read_data(path.join(datapath, split)) preprocessor.process_data(split_data, self.sample_chunksize, - path.join(processed_datapath, split)) + path.join(processed_datapath, split), + self.num_workers) datapath = processed_datapath diff --git a/scalr/feature_extraction_pipeline.py b/scalr/feature_extraction_pipeline.py index fa0a118..92bf606 100644 --- a/scalr/feature_extraction_pipeline.py +++ b/scalr/feature_extraction_pipeline.py @@ -186,15 +186,18 @@ def write_top_features_subset_data(self, data_config: dict) -> dict: 'test': test_data } + sample_chunksize = data_config.get('sample_chunksize') + num_workers = data_config.get('num_workers', 1) + for split, split_data in splits.items(): split_feature_subset_datapath = path.join(feature_subset_datapath, split) - sample_chunksize = data_config.get('sample_chunksize') write_chunkwise_data(split_data, sample_chunksize, split_feature_subset_datapath, - feature_inds=self.top_features) + feature_inds=self.top_features, + num_workers=num_workers) data_config['train_val_test'][ 'feature_subset_datapaths'] = feature_subset_datapath diff --git a/scalr/utils/file_utils.py b/scalr/utils/file_utils.py index 9939c12..cc17e2b 100644 --- a/scalr/utils/file_utils.py +++ b/scalr/utils/file_utils.py @@ -8,6 +8,8 @@ from anndata import AnnData import anndata as ad from anndata.experimental import AnnCollection +from joblib import delayed +from joblib import Parallel import numpy as np import pandas as pd import yaml @@ -79,7 +81,8 @@ def write_chunkwise_data(full_data: Union[AnnData, AnnCollection], dirpath: str, sample_inds: list[int] = None, feature_inds: list[int] = None, - transform=None): + transform: callable = None, + num_workers: int = None): """This function writes data subsets iteratively in a chunkwise manner, to ensure only at most `sample_chunksize` samples are loaded at a time. @@ -96,6 +99,8 @@ def write_chunkwise_data(full_data: Union[AnnData, AnnCollection], only a subset of features.dataframe. Defaults to all features. transform (function): a function to apply a transformation on a chunked numpy array. + num_workers (int): number of jobs to run in parallel for data writing. Additional + workers will not use additional memory, but will be CPU-intensive. """ if not path.exists(dirpath): os.makedirs(dirpath) @@ -121,13 +126,27 @@ def write_chunkwise_data(full_data: Union[AnnData, AnnCollection], for col in data.obs.columns: data.obs[col] = data.obs[col].astype('category') - # Transformation - if transform: - if not isinstance(data.X, np.ndarray): - data.X = data.X.A - data.X = transform(data.X) - - write_data(data, path.join(dirpath, f'{i}.h5ad')) + def transform_and_write_data(data: AnnData, chunk_number: int): + """Internal function to transform a chunk of data and write + it to disk.""" + # Transformation + if transform: + if not isinstance(data.X, np.ndarray): + data.X = data.X.A + data.X = transform(data.X) + + write_data(data, path.join(dirpath, f'{chunk_number}.h5ad')) + + worker_chunksize = (sample_chunksize // + num_workers) if num_workers else sample_chunksize + + # Execute parallel jobs for transformation and witing of data. + # In case of `num_workers = None`, single process is used. + parallel = Parallel(n_jobs=num_workers) + parallel( + delayed(transform_and_write_data)( + data=data[j * worker_chunksize:(j + 1) * worker_chunksize], + chunk_number=i * num_workers + j) for j in range(num_workers)) def _get_datapath_from_config(data_config): From 475a2081d0921ffc86a102be02888631936108b4 Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Fri, 25 Oct 2024 14:17:41 +0530 Subject: [PATCH 02/14] remove event logger for parallelization purposes --- scalr/data/preprocess/sample_norm.py | 5 ----- scalr/data/preprocess/standard_scale.py | 18 ------------------ 2 files changed, 23 deletions(-) diff --git a/scalr/data/preprocess/sample_norm.py b/scalr/data/preprocess/sample_norm.py index 96fa128..a9bc618 100644 --- a/scalr/data/preprocess/sample_norm.py +++ b/scalr/data/preprocess/sample_norm.py @@ -5,7 +5,6 @@ import numpy as np from scalr.data.preprocess import PreprocessorBase -from scalr.utils import EventLogger class SampleNorm(PreprocessorBase): @@ -20,8 +19,6 @@ def __init__(self, scaling_factor: float = 1.0): self.scaling_factor = scaling_factor - self.event_logger = EventLogger('Sample norm normalization') - def transform(self, data: np.ndarray) -> np.ndarray: """A function to transform provided input data. @@ -31,8 +28,6 @@ def transform(self, data: np.ndarray) -> np.ndarray: Returns: np.ndarray: Processed data. """ - self.event_logger.info('\Transforming data using sample norm.') - data *= (self.scaling_factor / (data.sum(axis=1).reshape(len(data), 1))) return data diff --git a/scalr/data/preprocess/standard_scale.py b/scalr/data/preprocess/standard_scale.py index 1e747ce..b095f0b 100644 --- a/scalr/data/preprocess/standard_scale.py +++ b/scalr/data/preprocess/standard_scale.py @@ -7,7 +7,6 @@ import numpy as np from scalr.data.preprocess import PreprocessorBase -from scalr.utils import EventLogger class StandardScaler(PreprocessorBase): @@ -28,8 +27,6 @@ def __init__(self, with_mean: bool = True, with_std: bool = True): self.train_mean = None self.train_std = None - self.event_logger = EventLogger('Standard Scaler Normalization') - def transform(self, data: np.ndarray) -> np.ndarray: """A function to transform provided input data. @@ -39,9 +36,6 @@ def transform(self, data: np.ndarray) -> np.ndarray: Returns: np.ndarray: processed data """ - self.event_logger.info( - '\Transforming data using standard scaler object') - if not self.with_mean: train_mean = np.zeros((1, data.shape[1])) else: @@ -58,9 +52,6 @@ def fit(self, data: Union[AnnData, AnnCollection], """ - self.event_logger.info('\n\nStarting standardscaler normalization') - self.event_logger.info('\nFitting standard scaler object on train data') - self.calculate_mean(data, sample_chunksize) self.calculate_std(data, sample_chunksize) @@ -76,7 +67,6 @@ def calculate_mean(self, data: Union[AnnData, AnnCollection], Nothing, stores mean per feature of the train data. """ - self.event_logger.info('Calculating mean of data...') train_sum = np.zeros(data.shape[1]).reshape(1, -1) # Iterate through batches of data to get mean statistics @@ -85,11 +75,6 @@ def calculate_mean(self, data: Union[AnnData, AnnCollection], sample_chunksize].X.sum(axis=0) self.train_mean = train_sum / data.shape[0] - if not self.with_mean: - self.event_logger.info( - '`train_mean` will be set to zero during `transform()`, as `with_mean` is set to False!' - ) - def calculate_std(self, data: Union[AnnData, AnnCollection], sample_chunksize: int) -> None: """A function to calculate standard deviation for each feature in the train data. @@ -104,7 +89,6 @@ def calculate_std(self, data: Union[AnnData, AnnCollection], # Getting standard deviation of entire train data per feature. if self.with_std: - self.event_logger.info('Calculating standard deviation of data...') self.train_std = np.zeros(data.shape[1]).reshape(1, -1) # Iterate through batches of data to get std statistics for i in range(int(np.ceil(data.shape[0] / sample_chunksize))): @@ -119,8 +103,6 @@ def calculate_std(self, data: Union[AnnData, AnnCollection], self.train_std[self.train_std == 0] = 1 else: # If `with_std` is False, set train_std to 1. - self.event_logger.info( - 'Setting `train_std` to be 1, as `with_std` is set to False!') self.train_std = np.ones((1, data.shape[1])) @classmethod From e7018f7c902143b25053902d6ab511c54a2b29d7 Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Fri, 25 Oct 2024 14:18:24 +0530 Subject: [PATCH 03/14] create a copy of data to apply pre-processing on during transformation --- scalr/utils/file_utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scalr/utils/file_utils.py b/scalr/utils/file_utils.py index cc17e2b..cb0da8b 100644 --- a/scalr/utils/file_utils.py +++ b/scalr/utils/file_utils.py @@ -129,8 +129,10 @@ def write_chunkwise_data(full_data: Union[AnnData, AnnCollection], def transform_and_write_data(data: AnnData, chunk_number: int): """Internal function to transform a chunk of data and write it to disk.""" + # Transformation if transform: + data = AnnData(data.X, obs=data.obs, var=data.var) if not isinstance(data.X, np.ndarray): data.X = data.X.A data.X = transform(data.X) From ac30dcec465852f9b59e4a432fe1ecf507c6e11a Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Thu, 7 Nov 2024 13:45:40 +0530 Subject: [PATCH 04/14] added num_workers param in config --- config/README.md | 5 +++++ config/config.yaml | 1 + 2 files changed, 6 insertions(+) diff --git a/config/README.md b/config/README.md index 29ee76e..92eff49 100644 --- a/config/README.md +++ b/config/README.md @@ -36,6 +36,11 @@ default: `50000` Useful for low resource utilization. This will ensure all data is stored in multiple chunks of almost `sample_chunksize` samples. This does not hamper any logic in algorithms but simply ensures that the entire dataset is never loaded all at once on the RAM. `null` value will disregard this optimization. +**num_workers** {int}: `int | null` +default: `1` +This param uses multiple workers in parallel to speed up the data writing to disk. Please use this +with careful consideration of the number of cores available in the device. *Note that this doesn't increase memory usage of pipeline*. Ideal increment found at `num_workers = 3`. + **train_val_test** {dict}: This section splits the data using the mentioned splitting technique mentioned in `splitter_config` & required params like `split_ratio` and `stratify` options. Example below. diff --git a/config/config.yaml b/config/config.yaml index 5725d45..0cea9fc 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -13,6 +13,7 @@ experiment: # DATA CONFIG. data: sample_chunksize: 20000 + num_workers: 1 train_val_test: full_datapath: '/path/to/anndata.h5ad' From 797600e31602064606bd8bf109d3f3ea14d653df Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Fri, 8 Nov 2024 16:08:19 +0530 Subject: [PATCH 05/14] solved bug for num_workers not working with None --- scalr/utils/file_utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scalr/utils/file_utils.py b/scalr/utils/file_utils.py index cb0da8b..a092df2 100644 --- a/scalr/utils/file_utils.py +++ b/scalr/utils/file_utils.py @@ -105,6 +105,9 @@ def write_chunkwise_data(full_data: Union[AnnData, AnnCollection], if not path.exists(dirpath): os.makedirs(dirpath) + if not num_workers: + num_workers = 1 + if not sample_inds: sample_inds = list(range(len(full_data))) From 83ff3b6643508d5294d633515c4902ef0da74fe4 Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Mon, 25 Nov 2024 16:32:58 +0530 Subject: [PATCH 06/14] parallel feature subset training; rudimentary --- config/config.yaml | 1 + scalr/feature/feature_subsetting.py | 97 ++++++++++++++++++++++++---- scalr/feature_extraction_pipeline.py | 22 +++++-- 3 files changed, 102 insertions(+), 18 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 0cea9fc..aebd971 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -43,6 +43,7 @@ feature_selection: # score_matrix: '/path/to/matrix' feature_subsetsize: 5000 + num_workers: 1 model: name: SequentialModel diff --git a/scalr/feature/feature_subsetting.py b/scalr/feature/feature_subsetting.py index a3bc495..f9076cd 100644 --- a/scalr/feature/feature_subsetting.py +++ b/scalr/feature/feature_subsetting.py @@ -7,11 +7,15 @@ from anndata import AnnData from anndata.experimental import AnnCollection +from joblib import delayed +from joblib import Parallel from torch import nn from scalr.model_training_pipeline import ModelTrainingPipeline from scalr.utils import EventLogger from scalr.utils import FlowLogger +from scalr.utils import read_data +from scalr.utils import write_chunkwise_data class FeatureSubsetting: @@ -30,7 +34,9 @@ def __init__(self, target: str, mappings: dict, dirpath: str = None, - device: str = 'cpu'): + device: str = 'cpu', + num_workers: int = None, + sample_chunksize: int = None): """Initialize required parameters for feature subset training. Args: @@ -43,9 +49,11 @@ def __init__(self, mappings (dict): mapping of target to labels. dirpath (str, optional): Dirpath to store chunked model weights. Defaults to None. device (str, optional): Device to train models on. Defaults to 'cpu'. + num_workers (int, optional): Number of parallel processes to launch to train multiple + feature subsets simultaneously. Defaults to using single + process. + sample_chunksize (int): Chunks of samples to be loaded in memory at once. """ - self.event_logger = EventLogger('FeatureSubsetting') - self.feature_subsetsize = feature_subsetsize self.chunk_model_config = chunk_model_config self.chunk_model_train_config = chunk_model_train_config @@ -55,6 +63,52 @@ def __init__(self, self.mappings = mappings self.dirpath = dirpath self.device = device + self.num_workers = num_workers if num_workers else 1 + self.sample_chunksize = sample_chunksize + + self.total_features = len(self.train_data.var_names) + + # Note that EventLogger does not work with parallel training + # You may use tensorboard logging to track model training logs + if self.num_workers == 1: + self.event_logger = EventLogger('FeatureSubsetting') + + def write_feature_subsetted_data(self): + """Write chunks of feature-subsetted data, to enable parallel training of models + using different chunks of data.""" + if self.num_workers == 1: + return + + self.feature_chunked_data_dirpath = path.join(self.dirpath, + 'chunked_data') + os.makedirs(self.feature_chunked_data_dirpath, exist_ok=True) + + i = 0 + for start in range(0, self.total_features, self.feature_subsetsize): + + feature_subset_inds = list( + range(start, + min(start + self.feature_subsetsize, + self.total_features))) + + write_chunkwise_data(self.train_data, + self.sample_chunksize, + path.join(self.feature_chunked_data_dirpath, + 'train', str(i)), + feature_inds=feature_subset_inds, + num_workers=self.num_workers) + + write_chunkwise_data(self.val_data, + self.sample_chunksize, + path.join(self.feature_chunked_data_dirpath, + 'val', str(i)), + feature_inds=feature_subset_inds, + num_workers=self.num_workers) + + i += 1 + + del self.train_data + del self.val_data def train_chunked_models(self) -> list[nn.Module]: """Trains a model for each subset data. @@ -62,23 +116,31 @@ def train_chunked_models(self) -> list[nn.Module]: Returns: list[nn.Module]: List of models for each subset. """ - self.event_logger.info('Feature subset models training') - models = [] + if self.num_workers == 1: + self.event_logger.info('Feature subset models training') + chunked_models_dirpath = path.join(self.dirpath, 'chunked_models') os.makedirs(chunked_models_dirpath, exist_ok=True) - i = 0 - for start in range(0, len(self.train_data.var_names), - self.feature_subsetsize): - self.event_logger.info(f'\nChunk {i}') + def train_chunked_model(i, start): + if self.num_workers == 1: + self.event_logger.info(f'\nChunk {i}') + chunk_dirpath = path.join(chunked_models_dirpath, str(i)) os.makedirs(chunk_dirpath, exist_ok=True) - i += 1 - train_features_subset = self.train_data[:, start:start + + if self.num_workers > 1: + train_features_subset = read_data( + path.join(self.feature_chunked_data_dirpath, 'train', + str(i))) + val_features_subset = read_data( + path.join(self.feature_chunked_data_dirpath, 'train', + str(i))) + else: + train_features_subset = self.train_data[:, start:start + + self.feature_subsetsize] + val_features_subset = self.val_data[:, start:start + self.feature_subsetsize] - val_features_subset = self.val_data[:, start:start + - self.feature_subsetsize] chunk_model_config = deepcopy(self.chunk_model_config) @@ -95,8 +157,15 @@ def train_chunked_models(self) -> list[nn.Module]: self.chunk_model_config, self.chunk_model_train_config = model_trainer.get_updated_config( ) - models.append(best_model) + return i, best_model + + parallel = Parallel(n_jobs=self.num_workers) + models = parallel( + delayed(train_chunked_model)(i, start) for i, (start) in enumerate( + range(0, self.total_features, self.feature_subsetsize))) + models = sorted(models) + models = [model[1] for model in models] return models def get_updated_configs(self): diff --git a/scalr/feature_extraction_pipeline.py b/scalr/feature_extraction_pipeline.py index 92bf606..7111de4 100644 --- a/scalr/feature_extraction_pipeline.py +++ b/scalr/feature_extraction_pipeline.py @@ -54,10 +54,14 @@ def load_data_and_targets_from_config(self, data_config: dict): data_config) self.target = data_config.get('target') self.mappings = read_data(data_config['label_mappings']) + self.sample_chunksize = data_config.get('sample_chunksize') - def set_data_and_targets(self, train_data: Union[AnnData, AnnCollection], + def set_data_and_targets(self, + train_data: Union[AnnData, AnnCollection], val_data: Union[AnnData, AnnCollection], - target: Union[str, list[str]], mappings: dict): + target: Union[str, list[str]], + mappings: dict, + sample_chunksize: int = None): """A function to set data when you don't use data directly from config, but rather by other sources like feature subsetting, etc. @@ -67,6 +71,7 @@ def set_data_and_targets(self, train_data: Union[AnnData, AnnCollection], target (Union[str, list[str]]): Target columns name(s). mappings (dict): Mapping of a column value to ids eg. mappings[column_name][label2id] = {A: 1, B:2, ...}. + sample_chunksize (int): Chunks of samples to be loaded in memory at once. """ self.train_data = train_data self.val_data = val_data @@ -80,6 +85,7 @@ def feature_subsetted_model_training(self) -> list[nn.Module]: self.feature_subsetsize = self.feature_selection_config.get( 'feature_subsetsize', len(self.val_data.var_names)) + self.num_workers = self.feature_selection_config.get('num_workers') chunk_model_config = self.feature_selection_config.get('model') chunk_model_train_config = self.feature_selection_config.get( @@ -88,7 +94,11 @@ def feature_subsetted_model_training(self) -> list[nn.Module]: chunked_features_model_trainer = FeatureSubsetting( self.feature_subsetsize, chunk_model_config, chunk_model_train_config, self.train_data, self.val_data, - self.target, self.mappings, self.dirpath, self.device) + self.target, self.mappings, self.dirpath, self.device, + self.num_workers, self.sample_chunksize) + + if self.num_workers > 1: + chunked_features_model_trainer.write_feature_subsetted_data() self.chunked_models = chunked_features_model_trainer.train_chunked_models( ) @@ -117,7 +127,9 @@ def feature_scoring(self) -> pd.DataFrame: all_scores = [] if not getattr(self, 'feature_subsetsize', None): self.feature_subsetsize = self.train_data.shape[1] + print('ft_subset_size', self.feature_subsetsize) + print('chunked_models', self.chunked_models) for i, (model) in enumerate(self.chunked_models): subset_train_data = self.train_data[:, i * self.feature_subsetsize:(i + @@ -132,6 +144,7 @@ def feature_scoring(self) -> pd.DataFrame: all_scores.append(score[:self.feature_subsetsize]) + print('all_scores', all_scores) columns = self.train_data.var_names columns.name = "index" class_labels = self.mappings[self.target]['id2label'] @@ -176,6 +189,7 @@ def write_top_features_subset_data(self, data_config: dict) -> dict: self.flow_logger.info('Writing feature-subset data onto disk') datapath = data_config['train_val_test'].get('final_datapaths') + feature_subset_datapath = path.join(self.dirpath, 'feature_subset_data') os.makedirs(feature_subset_datapath, exist_ok=True) @@ -187,7 +201,7 @@ def write_top_features_subset_data(self, data_config: dict) -> dict: } sample_chunksize = data_config.get('sample_chunksize') - num_workers = data_config.get('num_workers', 1) + num_workers = data_config.get('num_workers') for split, split_data in splits.items(): From 363e290efb78c632ae6ad14bf0ed2c3239e1afc4 Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Thu, 28 Nov 2024 15:08:23 +0530 Subject: [PATCH 07/14] removed erroneous validation by train set in feature_subset_training + removed print statement --- scalr/feature/feature_subsetting.py | 4 ++-- scalr/feature_extraction_pipeline.py | 3 --- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/scalr/feature/feature_subsetting.py b/scalr/feature/feature_subsetting.py index f9076cd..8557609 100644 --- a/scalr/feature/feature_subsetting.py +++ b/scalr/feature/feature_subsetting.py @@ -134,8 +134,7 @@ def train_chunked_model(i, start): path.join(self.feature_chunked_data_dirpath, 'train', str(i))) val_features_subset = read_data( - path.join(self.feature_chunked_data_dirpath, 'train', - str(i))) + path.join(self.feature_chunked_data_dirpath, 'val', str(i))) else: train_features_subset = self.train_data[:, start:start + self.feature_subsetsize] @@ -151,6 +150,7 @@ def train_chunked_model(i, start): model_trainer.set_data_and_targets(train_features_subset, val_features_subset, self.target, self.mappings) + model_trainer.build_model_training_artifacts() best_model = model_trainer.train() diff --git a/scalr/feature_extraction_pipeline.py b/scalr/feature_extraction_pipeline.py index 7111de4..d2bbd3a 100644 --- a/scalr/feature_extraction_pipeline.py +++ b/scalr/feature_extraction_pipeline.py @@ -127,9 +127,7 @@ def feature_scoring(self) -> pd.DataFrame: all_scores = [] if not getattr(self, 'feature_subsetsize', None): self.feature_subsetsize = self.train_data.shape[1] - print('ft_subset_size', self.feature_subsetsize) - print('chunked_models', self.chunked_models) for i, (model) in enumerate(self.chunked_models): subset_train_data = self.train_data[:, i * self.feature_subsetsize:(i + @@ -144,7 +142,6 @@ def feature_scoring(self) -> pd.DataFrame: all_scores.append(score[:self.feature_subsetsize]) - print('all_scores', all_scores) columns = self.train_data.var_names columns.name = "index" class_labels = self.mappings[self.target]['id2label'] From 16ad8c67e3e811f946c4af92188c6da42c482f3e Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Thu, 5 Dec 2024 14:21:40 +0530 Subject: [PATCH 08/14] Setting seed value every time before model training + handling of empy dataset being written during parallel write --- scalr/feature_extraction_pipeline.py | 1 + scalr/model_training_pipeline.py | 2 ++ scalr/utils/file_utils.py | 4 ++++ 3 files changed, 7 insertions(+) diff --git a/scalr/feature_extraction_pipeline.py b/scalr/feature_extraction_pipeline.py index d2bbd3a..a4c9b10 100644 --- a/scalr/feature_extraction_pipeline.py +++ b/scalr/feature_extraction_pipeline.py @@ -128,6 +128,7 @@ def feature_scoring(self) -> pd.DataFrame: if not getattr(self, 'feature_subsetsize', None): self.feature_subsetsize = self.train_data.shape[1] + # TODO: Parallelize feature scoring for i, (model) in enumerate(self.chunked_models): subset_train_data = self.train_data[:, i * self.feature_subsetsize:(i + diff --git a/scalr/model_training_pipeline.py b/scalr/model_training_pipeline.py index 1964cfa..7c29bf3 100644 --- a/scalr/model_training_pipeline.py +++ b/scalr/model_training_pipeline.py @@ -18,6 +18,7 @@ from scalr.utils import FlowLogger from scalr.utils import load_train_val_data_from_config from scalr.utils import read_data +from scalr.utils import set_seed from scalr.utils import write_data @@ -40,6 +41,7 @@ def __init__(self, device (str, optional): Device to run model on. Defaults to 'cpu'. """ self.flow_logger = FlowLogger('ModelTraining') + set_seed(42) self.train_config = train_config self.model_config = model_config diff --git a/scalr/utils/file_utils.py b/scalr/utils/file_utils.py index a092df2..15528d9 100644 --- a/scalr/utils/file_utils.py +++ b/scalr/utils/file_utils.py @@ -133,6 +133,10 @@ def transform_and_write_data(data: AnnData, chunk_number: int): """Internal function to transform a chunk of data and write it to disk.""" + # Handling of empty data + if len(data) == 0: + return + # Transformation if transform: data = AnnData(data.X, obs=data.obs, var=data.var) From e88f9a1bfaad98684bd885b87ea59868585e94c1 Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Mon, 9 Dec 2024 15:24:14 +0530 Subject: [PATCH 09/14] solved error of missing few samples due to integer division of parallel worker_chunksize --- scalr/utils/file_utils.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/scalr/utils/file_utils.py b/scalr/utils/file_utils.py index 15528d9..67dd9c8 100644 --- a/scalr/utils/file_utils.py +++ b/scalr/utils/file_utils.py @@ -1,6 +1,7 @@ """This file contains functions related to file read-write.""" import json +from math import ceil import os from os import path from typing import Union @@ -146,8 +147,9 @@ def transform_and_write_data(data: AnnData, chunk_number: int): write_data(data, path.join(dirpath, f'{chunk_number}.h5ad')) - worker_chunksize = (sample_chunksize // - num_workers) if num_workers else sample_chunksize + worker_chunksize = int( + ceil(sample_chunksize / + num_workers)) if num_workers else sample_chunksize # Execute parallel jobs for transformation and witing of data. # In case of `num_workers = None`, single process is used. From 11e77f0d83862958df8d78b626f72f7a1e054081 Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Mon, 30 Dec 2024 11:37:40 +0530 Subject: [PATCH 10/14] fixed error in parallelization not working for a dataset, andata not correctly loading into memory --- scalr/utils/file_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalr/utils/file_utils.py b/scalr/utils/file_utils.py index 67dd9c8..de35989 100644 --- a/scalr/utils/file_utils.py +++ b/scalr/utils/file_utils.py @@ -125,7 +125,7 @@ def write_chunkwise_data(full_data: Union[AnnData, AnnCollection], if not isinstance(data, AnnData): data = data.to_adata() - data = data.to_memory() + data = data.to_memory(copy=True) for col in data.obs.columns: data.obs[col] = data.obs[col].astype('category') From 14627fb10b23304d6d7c2ec951e817ddd74de6b0 Mon Sep 17 00:00:00 2001 From: amit-samal Date: Mon, 6 Jan 2025 11:51:05 +0530 Subject: [PATCH 11/14] update requirements.txt & tutorial_config --- requirements.txt | 1 + tutorials/pipeline/config_celltype.yaml | 2 ++ tutorials/pipeline/config_clinical.yaml | 2 ++ tutorials/pipeline/scalr_pipeline.ipynb | 2 ++ 4 files changed, 7 insertions(+) diff --git a/requirements.txt b/requirements.txt index 037c16e..56c6f0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ anndata==0.10.9 isort==5.13.2 +loky==3.4.1 memory-profiler==0.61.0 pillow==10.4.0 pre_commit==4.0.1 diff --git a/tutorials/pipeline/config_celltype.yaml b/tutorials/pipeline/config_celltype.yaml index 93cf182..a2ef0af 100644 --- a/tutorials/pipeline/config_celltype.yaml +++ b/tutorials/pipeline/config_celltype.yaml @@ -16,6 +16,7 @@ data: train_val_test: full_datapath: 'data/modified_adata.h5ad' + num_workers: 2 splitter_config: name: GroupSplitter @@ -42,6 +43,7 @@ feature_selection: # score_matrix: '/path/to/matrix' feature_subsetsize: 5000 + num_workers: 2 model: name: SequentialModel diff --git a/tutorials/pipeline/config_clinical.yaml b/tutorials/pipeline/config_clinical.yaml index 9cff995..f4ac2e8 100644 --- a/tutorials/pipeline/config_clinical.yaml +++ b/tutorials/pipeline/config_clinical.yaml @@ -16,6 +16,7 @@ data: train_val_test: full_datapath: 'data/modified_adata.h5ad' + num_workers: 2 splitter_config: name: GroupSplitter @@ -42,6 +43,7 @@ feature_selection: # score_matrix: '/path/to/matrix' feature_subsetsize: 5000 + num_workers: 2 model: name: SequentialModel diff --git a/tutorials/pipeline/scalr_pipeline.ipynb b/tutorials/pipeline/scalr_pipeline.ipynb index 6826c0e..1ac6477 100644 --- a/tutorials/pipeline/scalr_pipeline.ipynb +++ b/tutorials/pipeline/scalr_pipeline.ipynb @@ -550,8 +550,10 @@ " - The default `exp_run` number is `0`.If not changed, the celltype classification experiment would be `exp_run_0` with all the pipeline results.\n", "- **Data Config**\n", " - Update the `full_datapath` to `data/modified_adata.h5ad` (as we will include `GeneRecallCurve` in the downstream).\n", + " - Specify the `num_workers` value for effective parallelization.\n", " - Set `target` to `cell_type`.\n", "- **Feature Selection**\n", + " - Specify the `num_workers` value for effective parallelization.\n", " - Update the model layers to `[5000, 10]`, as there are only 10 cell types in the dataset.\n", " - Change `epoch` to `10`.\n", "- **Final Model Training**\n", From df6a8db96bbfa96b8b6725cb0c28e355221ba5e3 Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Mon, 6 Jan 2025 15:35:44 +0530 Subject: [PATCH 12/14] setting default values of num_workers to 1 --- scalr/data/preprocess/_preprocess.py | 2 +- scalr/feature/feature_subsetting.py | 5 +++-- scalr/feature_extraction_pipeline.py | 2 +- scalr/utils/file_utils.py | 3 +-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/scalr/data/preprocess/_preprocess.py b/scalr/data/preprocess/_preprocess.py index 989bd6b..57d2280 100644 --- a/scalr/data/preprocess/_preprocess.py +++ b/scalr/data/preprocess/_preprocess.py @@ -54,7 +54,7 @@ def process_data(self, full_data: Union[AnnData, AnnCollection], sample_chunksize: int, dirpath: str, - num_workers: int = None): + num_workers: int = 1): """A function to process the entire data chunkwise and write the processed data to disk. diff --git a/scalr/feature/feature_subsetting.py b/scalr/feature/feature_subsetting.py index 8557609..22b2806 100644 --- a/scalr/feature/feature_subsetting.py +++ b/scalr/feature/feature_subsetting.py @@ -35,7 +35,7 @@ def __init__(self, mappings: dict, dirpath: str = None, device: str = 'cpu', - num_workers: int = None, + num_workers: int = 1, sample_chunksize: int = None): """Initialize required parameters for feature subset training. @@ -52,7 +52,8 @@ def __init__(self, num_workers (int, optional): Number of parallel processes to launch to train multiple feature subsets simultaneously. Defaults to using single process. - sample_chunksize (int): Chunks of samples to be loaded in memory at once. + sample_chunksize (int, optional): Chunks of samples to be loaded in memory at once. + Required when `num_workers` > 1. """ self.feature_subsetsize = feature_subsetsize self.chunk_model_config = chunk_model_config diff --git a/scalr/feature_extraction_pipeline.py b/scalr/feature_extraction_pipeline.py index a4c9b10..4920731 100644 --- a/scalr/feature_extraction_pipeline.py +++ b/scalr/feature_extraction_pipeline.py @@ -85,7 +85,7 @@ def feature_subsetted_model_training(self) -> list[nn.Module]: self.feature_subsetsize = self.feature_selection_config.get( 'feature_subsetsize', len(self.val_data.var_names)) - self.num_workers = self.feature_selection_config.get('num_workers') + self.num_workers = self.feature_selection_config.get('num_workers', 1) chunk_model_config = self.feature_selection_config.get('model') chunk_model_train_config = self.feature_selection_config.get( diff --git a/scalr/utils/file_utils.py b/scalr/utils/file_utils.py index de35989..b097479 100644 --- a/scalr/utils/file_utils.py +++ b/scalr/utils/file_utils.py @@ -83,7 +83,7 @@ def write_chunkwise_data(full_data: Union[AnnData, AnnCollection], sample_inds: list[int] = None, feature_inds: list[int] = None, transform: callable = None, - num_workers: int = None): + num_workers: int = 1): """This function writes data subsets iteratively in a chunkwise manner, to ensure only at most `sample_chunksize` samples are loaded at a time. @@ -152,7 +152,6 @@ def transform_and_write_data(data: AnnData, chunk_number: int): num_workers)) if num_workers else sample_chunksize # Execute parallel jobs for transformation and witing of data. - # In case of `num_workers = None`, single process is used. parallel = Parallel(n_jobs=num_workers) parallel( delayed(transform_and_write_data)( From 691d811a5e9eabffb88e79478bbb76b32983ee39 Mon Sep 17 00:00:00 2001 From: Saiyam jogani <70364482+Saiyam26@users.noreply.github.com> Date: Thu, 16 Jan 2025 16:43:42 +0530 Subject: [PATCH 13/14] Update scalr/utils/file_utils.py comments addressed Co-authored-by: anand-infocusp --- scalr/utils/file_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalr/utils/file_utils.py b/scalr/utils/file_utils.py index b097479..b518ad8 100644 --- a/scalr/utils/file_utils.py +++ b/scalr/utils/file_utils.py @@ -100,7 +100,7 @@ def write_chunkwise_data(full_data: Union[AnnData, AnnCollection], only a subset of features.dataframe. Defaults to all features. transform (function): a function to apply a transformation on a chunked numpy array. - num_workers (int): number of jobs to run in parallel for data writing. Additional + num_workers (int): Number of jobs to run in parallel for data writing. Additional workers will not use additional memory, but will be CPU-intensive. """ if not path.exists(dirpath): From e3a13a2751fdb28e0379546ed4632af84f60bd7b Mon Sep 17 00:00:00 2001 From: Saiyam jogani <70364482+Saiyam26@users.noreply.github.com> Date: Thu, 16 Jan 2025 16:50:21 +0530 Subject: [PATCH 14/14] comment addressed, added comment --- scalr/feature/feature_subsetting.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scalr/feature/feature_subsetting.py b/scalr/feature/feature_subsetting.py index 22b2806..561d20e 100644 --- a/scalr/feature/feature_subsetting.py +++ b/scalr/feature/feature_subsetting.py @@ -165,6 +165,8 @@ def train_chunked_model(i, start): delayed(train_chunked_model)(i, start) for i, (start) in enumerate( range(0, self.total_features, self.feature_subsetsize))) + # parallel loop returns all models with the chunk number, which is used to sort models in order + # model[1] returns only the model, without the chunk number models = sorted(models) models = [model[1] for model in models] return models