From 4d64f1d98b6d14a3ea8c8ea347834abf60755b9d Mon Sep 17 00:00:00 2001 From: Alex Porter Date: Tue, 3 Mar 2026 15:02:53 +0000 Subject: [PATCH] refactor: Completed type hints and attribute annotations across chem, optimizer, CLI, and estimator modules. # Merge Request Notes **Summary** - Completed type hints and attribute annotations across chem, optimizer, CLI, and estimator modules. - Removed `from __future__ import annotations` across the repo as requested and reformatted tests. - Added `.isort.cfg` (black profile) to keep isort/black consistent. - Small maintenance fixes for pandas 3 compatibility, clearer errors, and unused imports. **Core Library** - `doptools/chem/chem_features.py`: completed type hints (signatures and attributes), added `DescriptorLike` protocol, typed feature collections, and cleaned up typing casts. Also replaced `DataFrame.applymap` with `DataFrame.map` for pandas 3 compatibility and added an explicit `ValueError` for unknown task types in `calculate_scores`. - `doptools/chem/coloratom.py`: added file-level lint/type suppressions to avoid legacy issues during typing/linting. - `doptools/chem/utils.py`: removed an unused local to satisfy lint. **Optimizer** - `doptools/optimizer/optimizer.py`: completed type hints and clarified internal variable typing; no logic changes. - `doptools/optimizer/preparer.py`: completed type hints, narrowed a broad `except` to `Exception`, and cleaned up messaging/formatting. - `doptools/optimizer/config.py`: type hints and `# noqa` on unused exported imports/constants. - `doptools/optimizer/__init__.py`: added `# noqa` to star exports. **CLI** - `doptools/cli/launch_optimizer.py`: replaced star imports with explicit imports and added type hints. - `doptools/cli/launch_preparer.py`: added type hints and file-level flake8 suppression to avoid heavy rewraps. - `doptools/cli/plotter.py`: added type hints. - `doptools/cli/ensemble_model_rebuilding.py`: added type hints and file-level flake8 suppression. - `doptools/cli/rebuilder.py`: added type hints and a `rebuild_from_file` stub that raises `NotImplementedError` with guidance. - `doptools/cli/__init__.py`: added `# noqa` to star exports. **Estimators** - `doptools/estimators/ad_estimators.py`: completed type hints; preserved the `BoudingBox` typo for tests and suppressed the resulting name error in typing/lint. - `doptools/estimators/consensus.py`: completed type hints. **Package Exports** - `doptools/__init__.py`, `doptools/chem/__init__.py`, `doptools/optimizer/__init__.py`, `doptools/cli/__init__.py`: added `# noqa: F401,F403` to allow star exports. **Tests** - Removed `from __future__ import annotations` from tests and reflowed with black where needed. - `tests/conftest.py` and `tests/data/chem/generate_expected.py`: `yaml` import marked `# type: ignore[import-untyped]`. - `tests/cli/test_launch_preparer.py`: typed `_DummyPool.mapped`. - `tests/estimators/test_ad_estimators.py`: removed unused pandas import. - `tests/chem/test_chem_features.py`: import reorder only. **Other** - `setup.py`: wrapped long description/classifier strings. - `.isort.cfg`: new config aligning isort with black. **Typing/Lint Suppressions & Ignores** - `doptools/chem/coloratom.py`: `# mypy: ignore-errors`, `# flake8: noqa`. - `doptools/cli/ensemble_model_rebuilding.py`: `# flake8: noqa`. - `doptools/cli/launch_preparer.py`: `# flake8: noqa`. - `doptools/estimators/ad_estimators.py`: `# type: ignore[name-defined]` and `# noqa: F821` for `BoudingBox`. - `doptools/optimizer/config.py`: `# noqa: F401` on exported imports, `# noqa: E501` on long descriptor strings. - `doptools/__init__.py`, `doptools/chem/__init__.py`, `doptools/optimizer/__init__.py`, `doptools/cli/__init__.py`: `# noqa: F401,F403` for star exports. - `tests/conftest.py`, `tests/data/chem/generate_expected.py`: `# type: ignore[import-untyped]` for `yaml`. **Non-Typing Behavior Changes** - `doptools/chem/chem_features.py`: `ChythonLinear.transform` now uses `DataFrame.map` instead of `applymap` (pandas 3 compatibility). - `doptools/chem/chem_features.py`: `calculate_scores` now raises `ValueError` for unknown task types (previously fell through). - `doptools/cli/rebuilder.py`: `rebuild_from_file` now explicitly raises `NotImplementedError`. **Tests & Lint** - `pre-commit run --all-files` - `python -m pytest -q -p no:cacheprovider` (99 passed, 3 skipped) **Files Changed** - `.isort.cfg` - `doptools/__init__.py` - `doptools/chem/__init__.py` - `doptools/chem/chem_features.py` - `doptools/chem/coloratom.py` - `doptools/chem/utils.py` - `doptools/cli/__init__.py` - `doptools/cli/ensemble_model_rebuilding.py` - `doptools/cli/launch_optimizer.py` - `doptools/cli/launch_preparer.py` - `doptools/cli/plotter.py` - `doptools/cli/rebuilder.py` - `doptools/estimators/ad_estimators.py` - `doptools/estimators/consensus.py` - `doptools/optimizer/__init__.py` - `doptools/optimizer/config.py` - `doptools/optimizer/optimizer.py` - `doptools/optimizer/preparer.py` - `setup.py` - `tests/chem/test_chem_features.py` - `tests/chem/test_coloratom.py` - `tests/chem/test_solvents.py` - `tests/chem/test_utils.py` - `tests/cli/test_ensemble_model_rebuilding.py` - `tests/cli/test_launch_optimizer.py` - `tests/cli/test_launch_preparer.py` - `tests/cli/test_plotter.py` - `tests/cli/test_rebuilder.py` - `tests/conftest.py` - `tests/data/chem/generate_expected.py` - `tests/estimators/test_ad_estimators.py` - `tests/estimators/test_consensus.py` - `tests/optimizer/test_config.py` - `tests/optimizer/test_optimizer.py` - `tests/optimizer/test_preparer.py` - `tests/optimizer/test_utils.py` - `MR_NOTES.md` --- .isort.cfg | 3 + doptools/__init__.py | 6 +- doptools/chem/__init__.py | 6 +- doptools/chem/chem_features.py | 323 +++++++++++++------- doptools/chem/coloratom.py | 2 + doptools/chem/utils.py | 1 - doptools/cli/__init__.py | 7 +- doptools/cli/ensemble_model_rebuilding.py | 139 ++++++--- doptools/cli/launch_optimizer.py | 59 ++-- doptools/cli/launch_preparer.py | 52 ++-- doptools/cli/plotter.py | 17 +- doptools/cli/rebuilder.py | 161 ++++++---- doptools/estimators/ad_estimators.py | 74 +++-- doptools/estimators/consensus.py | 31 +- doptools/optimizer/__init__.py | 6 +- doptools/optimizer/config.py | 16 +- doptools/optimizer/optimizer.py | 134 ++++---- doptools/optimizer/preparer.py | 68 +++-- setup.py | 8 +- tests/chem/test_chem_features.py | 17 +- tests/chem/test_coloratom.py | 2 - tests/chem/test_solvents.py | 2 - tests/chem/test_utils.py | 2 - tests/cli/test_ensemble_model_rebuilding.py | 2 - tests/cli/test_launch_optimizer.py | 5 +- tests/cli/test_launch_preparer.py | 4 +- tests/cli/test_plotter.py | 2 - tests/cli/test_rebuilder.py | 2 - tests/conftest.py | 4 +- tests/data/chem/generate_expected.py | 19 +- tests/estimators/test_ad_estimators.py | 3 - tests/estimators/test_consensus.py | 2 - tests/optimizer/test_config.py | 2 - tests/optimizer/test_optimizer.py | 2 - tests/optimizer/test_preparer.py | 2 - tests/optimizer/test_utils.py | 2 - 36 files changed, 716 insertions(+), 471 deletions(-) create mode 100644 .isort.cfg diff --git a/.isort.cfg b/.isort.cfg new file mode 100644 index 0000000..ba529b2 --- /dev/null +++ b/.isort.cfg @@ -0,0 +1,3 @@ +[settings] +profile = black +line_length = 88 diff --git a/doptools/__init__.py b/doptools/__init__.py index 3a5baa3..e409b00 100644 --- a/doptools/__init__.py +++ b/doptools/__init__.py @@ -16,6 +16,6 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, see . -from .chem import * -from .cli import * -from .optimizer import * +from .chem import * # noqa: F401,F403 +from .cli import * # noqa: F401,F403 +from .optimizer import * # noqa: F401,F403 diff --git a/doptools/chem/__init__.py b/doptools/chem/__init__.py index 3afa2d4..9e83e5d 100644 --- a/doptools/chem/__init__.py +++ b/doptools/chem/__init__.py @@ -16,6 +16,6 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, see . -from .chem_features import * -from .coloratom import * -from .solvents import * +from .chem_features import * # noqa: F401,F403 +from .coloratom import * # noqa: F401,F403 +from .solvents import * # noqa: F401,F403 diff --git a/doptools/chem/chem_features.py b/doptools/chem/chem_features.py index 7b1d731..9686d1e 100644 --- a/doptools/chem/chem_features.py +++ b/doptools/chem/chem_features.py @@ -16,9 +16,19 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, see . -from tqdm import tqdm from functools import partialmethod -from typing import Dict, Iterable, List, Optional, Tuple +from typing import ( + Any, + Dict, + Iterable, + List, + Optional, + Protocol, + Sequence, + Tuple, + Union, + cast, +) from warnings import warn import numpy as np @@ -27,8 +37,9 @@ from pandas import DataFrame from rdkit import Chem, RDLogger from rdkit.Avalon import pyAvalonTools -from rdkit.Chem import AllChem, rdMolDescriptors +from rdkit.Chem import AllChem from sklearn.base import BaseEstimator, TransformerMixin +from tqdm import tqdm # from mordred import Calculator, descriptors from doptools.chem.utils import _add_stereo_substructure @@ -47,14 +58,14 @@ class DescriptorCalculator: features of the calculator. """ - def __init__(self, name: str, size: Tuple[int]): - self._name = name - self._size = size - self._short_name = name - self.feature_names = [] + def __init__(self, name: str, size: Tuple[int, ...]) -> None: + self._name: str = name + self._size: Tuple[int, ...] = size + self._short_name: str = name + self.feature_names: Union[List[str], Dict[int, List[Any]]] = [] @property - def size(self) -> Tuple[int]: + def size(self) -> Tuple[int, ...]: """ Returns the size of the calculator as a tuple of integers. """ @@ -69,10 +80,10 @@ def name(self) -> str: return self._name @property - def short_name(self): + def short_name(self) -> str: return self._short_name - def get_feature_names(self) -> List[str]: + def get_feature_names(self) -> List[str] | Dict[int, List[Any]]: """ Returns the list of features as strings. """ @@ -116,8 +127,8 @@ def __init__( only_dynamic: bool = False, on_bond: bool = False, fmt: str = "mol", - keep_stereo="no", - ): + keep_stereo: str = "no", + ) -> None: """ Circus descriptor calculator constructor. @@ -133,21 +144,23 @@ def __init__( :param on_bond: toggle for calculating fragments centering on bonds. :type on_bond: bool - param fmt: format of the molecules for input ('mol' for MoleculeContainers, 'smiles' for strings). + param fmt: format of the molecules for input ('mol' for MoleculeContainers, + 'smiles' for strings). :type fmt: str - param keep_stereo: ("yes", "no", or "both") applicable for reactions to generate stereo-keeping CGR fragments. + param keep_stereo: ("yes", "no", or "both") applicable for reactions to + generate stereo-keeping CGR fragments. :type keep_stereo: str """ - self.feature_names = [] - self.lower = lower - self.upper = upper - self.only_dynamic = only_dynamic - self.fmt = fmt - self.on_bond = on_bond - self._name = "circus" - self._size = (lower, upper) - self.keep_stereo = keep_stereo + self.feature_names: List[str] = [] + self.lower: int = lower + self.upper: int = upper + self.only_dynamic: bool = only_dynamic + self.fmt: str = fmt + self.on_bond: bool = on_bond + self._name: str = "circus" + self._size: Tuple[int, ...] = (lower, upper) + self.keep_stereo: str = keep_stereo all_params = ["C", str(lower), str(upper)] if on_bond: all_params += ["B"] @@ -159,7 +172,11 @@ def __init__( all_params += ["BS"] self._short_name = "-".join(all_params) - def fit(self, X: DataFrame, y: Optional[List] = None): + def fit( + self, + X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]], + y: Optional[List[Any]] = None, + ) -> "ChythonCircus": """ Fits the calculator - finds all possible substructures in the given array of molecules/CGRs. @@ -177,6 +194,8 @@ def fit(self, X: DataFrame, y: Optional[List] = None): reac = None if self.fmt == "smiles": mol = smiles(mol) + mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol) + mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol) if isinstance(mol, ReactionContainer): reac = mol mol = reac.compose() @@ -223,7 +242,11 @@ def fit(self, X: DataFrame, y: Optional[List] = None): self.feature_names.append(sub_smiles) return self - def transform(self, X: Iterable, y: Optional[List] = None) -> DataFrame: + def transform( + self, + X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]], + y: Optional[List[Any]] = None, + ) -> DataFrame: """ Transforms the given array of molecules/CGRs to a data frame with features and their values. @@ -238,10 +261,12 @@ def transform(self, X: Iterable, y: Optional[List] = None) -> DataFrame: """ table = pd.DataFrame(columns=self.feature_names) for i, mol in enumerate(X): - visited_substructures = [] + visited_substructures: List[set[int]] = [] reac = None if self.fmt == "smiles": mol = smiles(mol) + mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol) + mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol) if isinstance(mol, ReactionContainer): reac = mol mol = reac.compose() @@ -331,20 +356,24 @@ def __init__( upper: int = 0, only_dynamic: bool = False, fmt: str = "mol", - ): - self.feature_names = [] - self.lower = lower - self.upper = upper - self.only_dynamic = only_dynamic - self.fmt = fmt - self._name = "chyline" - self._size = (lower, upper) + ) -> None: + self.feature_names: List[str] = [] + self.lower: int = lower + self.upper: int = upper + self.only_dynamic: bool = only_dynamic + self.fmt: str = fmt + self._name: str = "chyline" + self._size: Tuple[int, ...] = (lower, upper) all_params = ["H", str(lower), str(upper)] if only_dynamic: all_params += ["D"] self._short_name = "-".join(all_params) - def fit(self, X: DataFrame, y: Optional[List] = None): + def fit( + self, + X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]], + y: Optional[List[Any]] = None, + ) -> "ChythonLinear": """ Fits the calculator - finds all possible substructures in the given array of molecules/CGRs. @@ -358,20 +387,25 @@ def fit(self, X: DataFrame, y: Optional[List] = None): :type y: None """ self.feature_names = [] - output = [] + output: List[Dict[int, Any]] = [] for i, mol in enumerate(X): if self.fmt == "smiles": mol = smiles(mol) + mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol) if isinstance(mol, ReactionContainer): reac = mol mol = reac.compose() output.append( mol.linear_smiles_hash(self.lower, self.upper, number_bit_pairs=0) ) - self.feature_names = pd.DataFrame(output).columns + self.feature_names = list(pd.DataFrame(output).columns) return self - def transform(self, X: DataFrame, y: Optional[List] = None): + def transform( + self, + X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]], + y: Optional[List[Any]] = None, + ) -> DataFrame: """ Transforms the given array of molecules/CGRs to a data frame with features and their values. @@ -386,20 +420,21 @@ def transform(self, X: DataFrame, y: Optional[List] = None): """ df = pd.DataFrame(columns=self.feature_names, dtype=int) - output = [] + output: List[Dict[int, Any]] = [] for m in X: if self.fmt == "smiles": m = smiles(m) + m = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], m) if isinstance(m, ReactionContainer): reac = m m = reac.compose() output.append( m.linear_smiles_hash(self.lower, self.upper, number_bit_pairs=0) ) - output = pd.DataFrame(output) - output = output.map(lambda x: len(x) if isinstance(x, list) else 0) + output_df = pd.DataFrame(output) + output_df = output_df.map(lambda x: len(x) if isinstance(x, list) else 0) - output2 = output[output.columns.intersection(df.columns)] + output2 = output_df[output_df.columns.intersection(df.columns)] df = pd.concat([df, output2]) df = df.fillna(0) return df @@ -421,39 +456,43 @@ class Fingerprinter(DescriptorCalculator, BaseEstimator, TransformerMixin): def __init__( self, - fp_type, + fp_type: str, nBits: int = 1024, - radius=None, - params=None, - fmt="mol", - chirality=False, - ): + radius: Optional[int] = None, + params: Optional[Dict[str, Any]] = None, + fmt: str = "mol", + chirality: bool = False, + ) -> None: if params is None: params = {} - self.fp_type = fp_type - self.nBits = nBits - self.fmt = fmt + self.fp_type: str = fp_type + self.nBits: int = nBits + self.fmt: str = fmt if radius is None: - self._size = (nBits,) + self._size: Tuple[int, ...] = (nBits,) else: self._size = (radius, nBits) - self.radius = radius - self.params = params - self.chirality = chirality - self.info = dict([(i, []) for i in range(self.nBits)]) - self.feature_names = dict([(i, []) for i in range(self.nBits)]) - self.feature_names_chython = dict([(i, []) for i in range(self.nBits)]) + self.radius: Optional[int] = radius + self.params: Dict[str, Any] = params + self.chirality: bool = chirality + self.info: Dict[int, List[Any]] = dict([(i, []) for i in range(self.nBits)]) + self.feature_names: Dict[int, List[Any]] = dict( + [(i, []) for i in range(self.nBits)] + ) + self.feature_names_chython: Dict[int, List[Any]] = dict( + [(i, []) for i in range(self.nBits)] + ) if ( fp_type == "morgan" and "useFeatures" in params.keys() - and params["useFeatures"] == True + and params["useFeatures"] is True ): self._name = "morganfeatures" self._short_name = "-".join(["MF", str(nBits), str(radius)]) elif ( fp_type == "rdkfp" and "branchedPaths" in params.keys() - and params["branchedPaths"] == False + and params["branchedPaths"] is False ): self._name = "rdkfplinear" self._short_name = "-".join(["RL", str(nBits), str(radius)]) @@ -472,7 +511,11 @@ def __init__( all_params.append(str(radius)) self._short_name = "-".join(all_params) - def fit(self, X: DataFrame, y=None): + def fit( + self, + X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]], + y: Optional[List[Any]] = None, + ) -> "Fingerprinter": """ Fits the fingerprint calculator. @@ -486,8 +529,8 @@ def fit(self, X: DataFrame, y=None): return self - def get_features(self, x, output="smiles"): - features = dict([(i, []) for i in range(self.nBits)]) + def get_features(self, x: Any, output: str = "smiles") -> Dict[int, Any]: + features: Dict[int, Any] = dict([(i, []) for i in range(self.nBits)]) m = Chem.MolFromSmiles(str(x)) if self.fp_type == "avalon": pass @@ -502,7 +545,8 @@ def get_features(self, x, output="smiles"): if not hasattr(self, "chirality"): # Back compatibility self.chirality = False warn( - "Compatibility mode: The pipeline was created with an older version of DOPTools. Consider recreating it" + "Compatibility mode: The pipeline was created with an older " + "version of DOPTools. Consider recreating it" ) if "useFeatures" in self.params and self.params["useFeatures"]: @@ -518,13 +562,13 @@ def get_features(self, x, output="smiles"): ) ao = AllChem.AdditionalOutput() ao.CollectBitInfoMap() - desc = frg.GetFingerprintAsNumPy(m, additionalOutput=ao) + frg.GetFingerprintAsNumPy(m, additionalOutput=ao) bmap = ao.GetBitInfoMap() for k, v in bmap.items(): for i in v: if i[1] > 0: env = Chem.FindAtomEnvironmentOfRadiusN(m, i[1], i[0]) - amap = {} + amap: Dict[int, int] = {} submol = Chem.PathToSubmol(m, env, atomMap=amap) if output == "smiles": features[k].append(Chem.MolToSmiles(submol, canonical=True)) @@ -550,7 +594,7 @@ def get_features(self, x, output="smiles"): ) ao = AllChem.AdditionalOutput() ao.CollectBitPaths() - desc = frg.GetFingerprintAsNumPy(m, additionalOutput=ao) + frg.GetFingerprintAsNumPy(m, additionalOutput=ao) bmap = ao.GetBitPaths() for k, v in bmap.items(): for i in v: @@ -600,7 +644,11 @@ def get_features(self, x, output="smiles"): def get_feature_names(self) -> List[str]: return [str(i) for i in range(self.nBits)] - def transform(self, X, y=None): + def transform( + self, + X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]], + y: Optional[List[Any]] = None, + ) -> DataFrame: """ Transforms the given array of molecules to a data frame with features and their values. @@ -628,7 +676,8 @@ def transform(self, X, y=None): if not hasattr(self, "chirality"): # Back compatibility self.chirality = False warn( - "Compatibility mode: The pipeline was created with an older version of DOPTools. Consider recreating it" + "Compatibility mode: The pipeline was created with an older " + "version of DOPTools. Consider recreating it" ) if self.fp_type == "atompairs": @@ -666,34 +715,57 @@ def transform(self, X, y=None): return pd.DataFrame(np.array(res), columns=[str(i) for i in range(self.nBits)]) +class DescriptorLike(Protocol): + short_name: str + + def fit( + self, X: Any, y: Optional[List[Any]] = None + ) -> "DescriptorLike": # pragma: no cover - typing protocol + ... + + def transform(self, X: Any, y: Optional[List[Any]] = None) -> DataFrame: ... + + def get_feature_names(self) -> List[str]: ... + + class ComplexFragmentor(DescriptorCalculator, BaseEstimator, TransformerMixin): """ - ComplexFragmentor class is a scikit-learn compatible transformer that concatenates the features - according to specified associations. The most important argument is the "associator" - a list of tuples - that establishes the correspondence between a column in a data frame X and the transformer - that is trained on it (similarly to how sklearn Pipeline works). - - For example, say you have a data frame with molecules/CGRs in one column ("molecules"), and - solvents in another ("solvent"). You want to generate a feture table that includes both structural - and solvent descriptors. You would define a ComplexFragmentor class with associator as a list of tuples, - where each tuple is a pair of column names and the corresponding feature generators. In this case, e.g., + ComplexFragmentor class is a scikit-learn compatible transformer that concatenates + the features according to specified associations. The most important argument is + the "associator" - a list of tuples that establishes the correspondence between a + column in a data frame X and the transformer that is trained on it (similarly to + how sklearn Pipeline works). + + For example, say you have a data frame with molecules/CGRs in one column + ("molecules"), and solvents in another ("solvent"). You want to generate a + feature table that includes both structural and solvent descriptors. You would + define a ComplexFragmentor class with associator as a list of tuples, where each + tuple is a pair of column names and the corresponding feature generators. In this + case, e.g., associator = [("molecules", Augmentor(lower=a, upper=b)), - ("solvent":SolventVectorizer())] # see CIMTools library for solvent features + ("solvent":SolventVectorizer())] # see CIMTools for features - ComplexFragmentor assumes that one of the types of features will be structural, thus, - "structure_column" parameter defines the column of the data frame where structures are found. + ComplexFragmentor assumes that one of the types of features will be structural, + thus, "structure_column" parameter defines the column of the data frame where + structures are found. """ - def __init__(self, associator: List[Tuple[str, object]], structure_columns=None): - self.structure_columns = [] if structure_columns is None else structure_columns - self.associator = associator + def __init__( + self, + associator: Sequence[Tuple[str, DescriptorLike]], + structure_columns: Optional[List[str]] = None, + ) -> None: + self.structure_columns: List[str] = ( + [] if structure_columns is None else structure_columns + ) + self.associator: List[Tuple[str, DescriptorLike]] = list(associator) # self.fragmentor = self.associator[self.structure_column] - self.feature_names = [] - self._name = "ComplexFragmentor" - self._short_name = ".".join([c[1].short_name for c in associator]) + self.feature_names: List[str] = [] + self._name: str = "ComplexFragmentor" + self._short_name: str = ".".join([c[1].short_name for c in associator]) - def fit(self, x: DataFrame, y: Optional[List] = None): + def fit(self, x: DataFrame, y: Optional[List[Any]] = None) -> "ComplexFragmentor": """ Fits the calculator - finds all possible substructures in the given array of molecules/CGRs. @@ -715,7 +787,11 @@ def fit(self, x: DataFrame, y: Optional[List] = None): self.feature_names += [k + "::" + f for f in v.get_feature_names()] return self - def transform(self, x: DataFrame, y: Optional[List] = None) -> DataFrame: + def transform( + self, + x: Union[DataFrame, Dict[str, Any], List[Any], pd.Series], + y: Optional[List[Any]] = None, + ) -> DataFrame: """ Transforms the given data frame to a data frame of features with their values. Applies each feature generator @@ -730,7 +806,7 @@ def transform(self, x: DataFrame, y: Optional[List] = None) -> DataFrame: doesn't change the function at all. :type y: None """ - concat = [] + concat: List[DataFrame] = [] if not isinstance(x, DataFrame) and isinstance(x, (dict, list, pd.Series)): x = pd.DataFrame(x if isinstance(x, list) else [x]) for k, v in self.associator: @@ -805,22 +881,22 @@ class PassThrough(DescriptorCalculator, BaseEstimator, TransformerMixin): ComplexFragmentor. """ - def __init__(self, column_names: List[str]): - self.column_names = column_names - self.feature_names = self.column_names - self._name = "numerical" - self._short_name = "N" - self._size = () + def __init__(self, column_names: List[str]) -> None: + self.column_names: List[str] = column_names + self.feature_names: List[str] = self.column_names + self._name: str = "numerical" + self._short_name: str = "N" + self._size: Tuple[int, ...] = () - def fit(self, x: DataFrame, y=None): + def fit(self, x: DataFrame, y: Optional[List[Any]] = None) -> "PassThrough": """ Fits the calculator. Parameters are not necessary. """ return self def transform( - self, x: DataFrame, y: Optional[List] = None, check: Optional[bool] = True - ): + self, x: DataFrame, y: Optional[List[Any]] = None, check: bool = True + ) -> DataFrame: """ Returns the column without any transformation. @@ -839,7 +915,7 @@ def transform( raise ValueError("Non numerical value(s) provided to PassThrough") return df - def get_feature_names(self): + def get_feature_names(self) -> List[str]: return self.feature_names @@ -877,7 +953,7 @@ def __init__( upper: int = 0, only_dynamic: bool = False, fmt: str = "mol", - ): + ) -> None: """ Circus descriptor calculator constructor. @@ -890,19 +966,24 @@ def __init__( :param only_dynamic: toggle for calculating only fragments with dynamic items. :type only_dynamic: bool - param fmt: format of the molecules for input ('mol' for MoleculeContainers, 'smiles' for strings). + param fmt: format of the molecules for input ('mol' for MoleculeContainers, + 'smiles' for strings). :type fmt: str """ - self.feature_names = [] - self.features = [] - self.lower = lower - self.upper = upper - self.only_dynamic = only_dynamic - self.fmt = fmt - self._name = "linear" - self._size = (lower, upper) - - def fit(self, X: DataFrame, y: Optional[List] = None): + self.feature_names: List[str] = [] + self.features: List[Any] = [] + self.lower: int = lower + self.upper: int = upper + self.only_dynamic: bool = only_dynamic + self.fmt: str = fmt + self._name: str = "linear" + self._size: Tuple[int, ...] = (lower, upper) + + def fit( + self, + X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]], + y: Optional[List[Any]] = None, + ) -> "ChythonCircusNonhash": """ Fits the calculator - finds all possible substructures in the given array of molecules/CGRs. @@ -920,6 +1001,7 @@ def fit(self, X: DataFrame, y: Optional[List] = None): for i, mol in enumerate(X): if self.fmt == "smiles": mol = smiles(mol) + mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol) for length in range(self.lower, self.upper + 1): for atom in mol.atoms(): # deep is the radius of the neighborhood sphere in bonds @@ -932,7 +1014,11 @@ def fit(self, X: DataFrame, y: Optional[List] = None): self.features.append(sub) return self - def transform(self, X: DataFrame, y: Optional[List] = None) -> DataFrame: + def transform( + self, + X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]], + y: Optional[List[Any]] = None, + ) -> DataFrame: """ Transforms the given array of molecules/CGRs to a data frame with features and their values. @@ -949,15 +1035,18 @@ def transform(self, X: DataFrame, y: Optional[List] = None) -> DataFrame: for i, mol in enumerate(X): if self.fmt == "smiles": mol = smiles(mol) + mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol) table.loc[len(table)] = 0 for sub in self.features: - # if CGRs are used, the transformation of the substructure to the CGRcontainer is needed + # If CGRs are used, transformation of the substructure to the + # CGR container is needed. mapping = list(sub.get_mapping(mol)) - # mapping is the list of all possible substructure mappings into the given molecule/CGR + # Mapping is the list of all possible substructure mappings into + # the given molecule/CGR. table.loc[i, str(sub)] = len(mapping) return table - def get_feature_names(self): + def get_feature_names(self) -> List[str]: return self.feature_names diff --git a/doptools/chem/coloratom.py b/doptools/chem/coloratom.py index ef6ab79..a2e180d 100644 --- a/doptools/chem/coloratom.py +++ b/doptools/chem/coloratom.py @@ -1,4 +1,6 @@ # -*- coding: utf-8 -*- +# mypy: ignore-errors +# flake8: noqa # # Copyright 2022-2025 Pavel Sidorov This # file is part of DOPTools repository. diff --git a/doptools/chem/utils.py b/doptools/chem/utils.py index 87cb172..6e648fa 100644 --- a/doptools/chem/utils.py +++ b/doptools/chem/utils.py @@ -80,7 +80,6 @@ def _pos_in_string_atom(cgr, cgr_string, number): def _add_stereo_substructure(substructure, reaction): - substructure_atoms = list(substructure._atoms) cts = _gather_ct_stereos(reaction) rss = _gather_rs_stereos(reaction) cgr_smiles = str(substructure) diff --git a/doptools/cli/__init__.py b/doptools/cli/__init__.py index 1cabc71..08ebb73 100644 --- a/doptools/cli/__init__.py +++ b/doptools/cli/__init__.py @@ -16,7 +16,8 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, see . -from .launch_optimizer import * -from .launch_preparer import * -from .plotter import * +from .launch_optimizer import * # noqa: F401,F403 +from .launch_preparer import * # noqa: F401,F403 +from .plotter import * # noqa: F401,F403 + # from .rebuilder import * diff --git a/doptools/cli/ensemble_model_rebuilding.py b/doptools/cli/ensemble_model_rebuilding.py index 7a4a971..7b982f2 100644 --- a/doptools/cli/ensemble_model_rebuilding.py +++ b/doptools/cli/ensemble_model_rebuilding.py @@ -1,3 +1,5 @@ +# flake8: noqa + import argparse import glob import logging @@ -8,12 +10,12 @@ import sys from functools import partial from multiprocessing import Manager +from typing import Any, Dict, Iterable, List, Optional, Tuple, cast import matplotlib.pyplot as plt import numpy as np import pandas as pd from chython import smiles -from sklearn.base import BaseEstimator, TransformerMixin from sklearn.datasets import load_svmlight_file from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor from sklearn.feature_selection import VarianceThreshold @@ -25,7 +27,7 @@ from sklearn.metrics import r2_score as r2 from sklearn.metrics import root_mean_squared_error as rmse from sklearn.pipeline import Pipeline -from sklearn.preprocessing import FunctionTransformer, MinMaxScaler +from sklearn.preprocessing import MinMaxScaler from sklearn.svm import SVC, SVR from xgboost import XGBClassifier, XGBRegressor @@ -39,7 +41,9 @@ ) -def populate_trials_dictionary(trials_folders): +def populate_trials_dictionary( + trials_folders: List[str], +) -> Dict[str, Dict[str, str]]: """ Populate a dictionary with trial information from specified folders. @@ -62,7 +66,7 @@ def populate_trials_dictionary(trials_folders): trials_file = os.path.join(folder, "trials.best") if os.path.isfile(trials_file): - df = pd.read_csv(trials_file, sep="\s+") + df = pd.read_csv(trials_file, sep=r"\s+") if "method" in df.columns: method_value = df["method"].iloc[0] if method_value in trials_dict: @@ -84,7 +88,7 @@ def populate_trials_dictionary(trials_folders): return trials_dict -def create_output_dir(outdir): +def create_output_dir(outdir: str) -> None: """ Create an output directory if it does not already exist. @@ -105,7 +109,11 @@ def create_output_dir(outdir): logging.info("The output directory {} created".format(outdir)) -def select_best_CV_models(trials_info_dict, model_type, nb_classes): +def select_best_CV_models( + trials_info_dict: Dict[str, Dict[str, str]], + model_type: str, + nb_classes: Optional[int], +) -> pd.DataFrame: """ Selects up to 15 best models based on the model's score in CV. Only one model is selected per descriptor space per ML method. @@ -117,7 +125,7 @@ def select_best_CV_models(trials_info_dict, model_type, nb_classes): Returns: pandas.DataFrame: A DataFrame containing the selected best models sorted by score in descending order. """ - models_by_desc = {} + models_by_desc: Dict[str, List[Dict[str, Any]]] = {} highest_score = float("-inf") for method, info in trials_info_dict.items(): @@ -138,7 +146,7 @@ def select_best_CV_models(trials_info_dict, model_type, nb_classes): with open(trials_file, "w") as file: file.writelines(corrected_lines) - model_stats = pd.read_csv(trials_file, sep="\s+") + model_stats = pd.read_csv(trials_file, sep=r"\s+") highest_score = max(highest_score, model_stats["score"].max()) # Per each descriptor space only one (the best) descriptor space is selected. for desc, group in model_stats.groupby("desc"): @@ -146,7 +154,9 @@ def select_best_CV_models(trials_info_dict, model_type, nb_classes): models_by_desc[desc] = [] models_by_desc[desc].extend(group.to_dict("records")) - threshold = 1 / nb_classes if model_type == "class" else 0.5 + if model_type == "class" and nb_classes is None: + raise ValueError("nb_classes must be provided for classification models.") + threshold = 1 / cast(int, nb_classes) if model_type == "class" else 0.5 if highest_score < threshold: if not args.desperate: logging.info( @@ -171,7 +181,13 @@ def select_best_CV_models(trials_info_dict, model_type, nb_classes): return best_models.head(10) -def create_model_folder(desc_folder, outdir, models_from_CV, input_df, test_set_df): +def create_model_folder( + desc_folder: str, + outdir: str, + models_from_CV: pd.DataFrame, + input_df: pd.DataFrame, + test_set_df: Optional[pd.DataFrame], +) -> None: """ Create a folder containing relevant files (pickled pipelines and associated descriptor files) based on the best models and copy the training set file. @@ -197,7 +213,7 @@ def create_model_folder(desc_folder, outdir, models_from_CV, input_df, test_set_ shutil.copyfile(file_path, os.path.join(outdir, file_name)) -def load_pkl(pkl_file): +def load_pkl(pkl_file: str) -> Any: """ Load a pickled file from the given path. @@ -216,14 +232,14 @@ def load_pkl(pkl_file): def rebuild_and_evaluate_reg_model( - model_row_tuple, - shared_data, - outdir, - desc_folder, - property_col, - model_type, - predict_df, -): + model_row_tuple: Tuple[int, pd.Series], + shared_data: List[Dict[str, Any]], + outdir: str, + desc_folder: str, + property_col: str, + model_type: str, + predict_df: pd.DataFrame, +) -> float: """ Rebuilds a regression model from specified parameters and evaluates it using the provided prediction dataset. This function serves as a workaround to overcome the problem when a regression model gets decent score during CV. @@ -304,8 +320,13 @@ def rebuild_and_evaluate_reg_model( def rebuild_model( - model_row_tuple, shared_data, outdir, desc_folder, property_col, model_type -): + model_row_tuple: Tuple[int, pd.Series], + shared_data: List[Dict[str, Any]], + outdir: str, + desc_folder: str, + property_col: str, + model_type: str, +) -> None: """ Rebuild a machine learning model based on the provided model information and input data. @@ -414,7 +435,11 @@ def rebuild_model( logging.info(f"{model_filename} saved.") -def aggregate_CV_predictions(trials_info_dict, best_models, model_type): +def aggregate_CV_predictions( + trials_info_dict: Dict[str, Dict[str, str]], + best_models: pd.DataFrame, + model_type: str, +) -> pd.DataFrame: """ Aggregate predictions from various models and create a summary DataFrame. @@ -440,7 +465,7 @@ def aggregate_CV_predictions(trials_info_dict, best_models, model_type): if os.path.isfile(file_path): # Read the predictions for the current model. - trial_predictions = pd.read_csv(file_path, sep="\s+") + trial_predictions = pd.read_csv(file_path, sep=r"\s+") # Extract the actual values and predicted values based on column headers if actual_values is None: actual_values = trial_predictions.filter(like=".observed").iloc[ @@ -490,7 +515,9 @@ def aggregate_CV_predictions(trials_info_dict, best_models, model_type): return final_df -def evaluate_AD_apply_model(desc_file, shared_molecules): +def evaluate_AD_apply_model( + desc_file: str, shared_molecules: Iterable[Dict[str, Any]] +) -> pd.DataFrame: """ Evaluate the applicability domain of the compounds and apply the model in the given descriptor space @@ -509,7 +536,9 @@ def evaluate_AD_apply_model(desc_file, shared_molecules): """ - def frag_ctrl(p_DF, train_fragments, desc_space): + def frag_ctrl( + p_DF: pd.DataFrame, train_fragments: set[Any], desc_space: str + ) -> pd.DataFrame: """ Update the confidence level column based on the fragment control check. @@ -522,7 +551,7 @@ def frag_ctrl(p_DF, train_fragments, desc_space): pd.DataFrame: The updated DataFrame with confidence levels. """ - def conf_update(row): + def conf_update(row: pd.Series) -> pd.Series: """ Update the confidence level for a single row based on the fragment control check. @@ -558,7 +587,12 @@ def conf_update(row): return p_DF.apply(conf_update, axis=1) - def bbox(p_DF, max_train_descs, p_descs, desc_space): + def bbox( + p_DF: pd.DataFrame, + max_train_descs: np.ndarray, + p_descs: np.ndarray, + desc_space: str, + ) -> pd.DataFrame: """ Update the confidence level column based on the bounding box check. @@ -615,7 +649,7 @@ def bbox(p_DF, max_train_descs, p_descs, desc_space): 0 ] # Extract the file name without extension print(model_name) - model_pipeline = load_pkl(model_path) + model_pipeline: Any = load_pkl(model_path) # Initialize column 'Conf' + desc_space populated with zeros shared_predict_df[f"Conf-{desc_space}"] = 0 @@ -655,7 +689,11 @@ def bbox(p_DF, max_train_descs, p_descs, desc_space): return shared_predict_df -def aggregate_test_predictions(all_predictions, ext_test_set_DF, model_type): +def aggregate_test_predictions( + all_predictions: Dict[str, Dict[str, Any]], + ext_test_set_DF: pd.DataFrame, + model_type: str, +) -> Dict[str, pd.DataFrame]: """ Aggregates prediction data for a given external test set DataFrame and calculates confidence levels and statistical summaries based on the model type. @@ -684,7 +722,7 @@ def aggregate_test_predictions(all_predictions, ext_test_set_DF, model_type): to the provided model type. """ - def in_AD_aggregation(df_row): + def in_AD_aggregation(df_row: pd.Series) -> pd.Series: """ Aggregates predictions for molecules within the applicability domain. @@ -789,7 +827,9 @@ def in_AD_aggregation(df_row): return {"In_AD": DF_in_AD, "Out_AD": DF_out_AD} -def calculate_scores(final_df, property_col, model_type): +def calculate_scores( + final_df: pd.DataFrame, property_col: str, model_type: str +) -> Dict[str, float]: """ Calculate evaluation scores based on the true and predicted values. @@ -817,7 +857,13 @@ def calculate_scores(final_df, property_col, model_type): return scores -def plot_regression(dataframe, property_col, scores, outdir, test_set_df): +def plot_regression( + dataframe: pd.DataFrame, + property_col: str, + scores: Dict[str, float], + outdir: str, + test_set_df: Optional[pd.DataFrame], +) -> None: """ Create a regression plot based on the true and predicted values and save it to the specified output directory. @@ -906,8 +952,13 @@ def plot_regression(dataframe, property_col, scores, outdir, test_set_df): def generate_confusion_matrix( - dataframe, scores, outdir, nb_classes, class_info, test_set_df -): + dataframe: pd.DataFrame, + scores: Dict[str, float], + outdir: str, + nb_classes: int, + class_info: str, + test_set_df: Optional[pd.DataFrame], +) -> None: """ Generate a confusion matrix and write it along with scores to a file in the specified output directory. @@ -1078,10 +1129,11 @@ def generate_confusion_matrix( property_col = args.property_col # Maybe not the most elegant solution, but it does what it needs to do. Maybe will refactor one day - final_DF = None - final_DF_out_AD = None + final_DF: Optional[pd.DataFrame] = None + final_DF_out_AD: Optional[pd.DataFrame] = None # Validate model type and number of classes + nb_classes: Optional[int] if model_type == "class": if args.class_info is None: logging.error( @@ -1191,9 +1243,11 @@ def generate_confusion_matrix( partial_rebuild_and_evaluatefunc = partial( rebuild_and_evaluate_reg_model, **kwargs ) - results = pool.map(partial_rebuild_and_evaluatefunc, first_func_args) + eval_scores = pool.map( + partial_rebuild_and_evaluatefunc, first_func_args + ) # Assign the results back to the model_from_CV. It is safe to do that, because when using pool.map() the order of the results is preserved relative to the order of the inputs. - models_from_CV["evaluation_score"] = results + models_from_CV["evaluation_score"] = eval_scores indices_to_drop = models_from_CV[ models_from_CV["evaluation_score"] < 0.5 ].index # Do you really want to live in a world were models with such score are getting accepted? @@ -1276,7 +1330,7 @@ def generate_confusion_matrix( if model_type == "reg": minimal_row_requirement = 2 else: - minimal_row_requirement = nb_classes + minimal_row_requirement = cast(int, nb_classes) # Handling compounds in AD if len(final_DF) >= minimal_row_requirement: @@ -1288,7 +1342,12 @@ def generate_confusion_matrix( plot_regression(final_DF, property_col, scores, model_folder, test_set_df) else: generate_confusion_matrix( - final_DF, scores, model_folder, nb_classes, class_info, test_set_df + final_DF, + scores, + model_folder, + cast(int, nb_classes), + class_info, + test_set_df, ) else: logging.info( diff --git a/doptools/cli/launch_optimizer.py b/doptools/cli/launch_optimizer.py index 36119f5..393c181 100644 --- a/doptools/cli/launch_optimizer.py +++ b/doptools/cli/launch_optimizer.py @@ -21,12 +21,10 @@ import contextlib import os import warnings -from functools import partial -from multiprocessing import Manager import optuna -from doptools.optimizer.optimizer import * +from doptools.optimizer.optimizer import collect_data, launch_study warnings.simplefilter(action="ignore", category=FutureWarning) warnings.simplefilter(action="ignore", category=DeprecationWarning) @@ -34,30 +32,42 @@ optuna.logging.set_verbosity(optuna.logging.WARNING) -def launch_optimizer(): +def launch_optimizer() -> None: parser = argparse.ArgumentParser( prog="Optuna optimizer", - description='Optimizes the hyperparameters of ML method on given data, as well as selects the "best" descriptor space.', + description=( + "Optimizes the hyperparameters of ML method on given data, as well as " + 'selects the "best" descriptor space.' + ), ) parser.add_argument( "-d", "--datadir", required=True, - help="Path to the directory containing the descriptors files to run the optimisation on.", + help=( + "Path to the directory containing the descriptors files to run the " + "optimisation on." + ), ) parser.add_argument( "-o", "--outdir", required=True, - help="Path to the output directory where the results optimization will be saved.", + help=( + "Path to the output directory where the results optimization will be " + "saved." + ), ) parser.add_argument( "--ntrials", type=int, default=100, - help="Number of hyperparameter sets to explore. After exploring this number of sets, the optimization stops. Default = 100.", + help=( + "Number of hyperparameter sets to explore. After exploring this number " + "of sets, the optimization stops. Default = 100." + ), ) parser.add_argument( "--cv_splits", @@ -69,26 +79,37 @@ def launch_optimizer(): "--cv_repeats", type=int, default=1, - help="Number of times the cross-validation will be repeated with shuffling. Scores are reported as consensus between repeats. Default = 1.", + help=( + "Number of times the cross-validation will be repeated with shuffling. " + "Scores are reported as consensus between repeats. Default = 1." + ), ) parser.add_argument( "--earlystop_patience", type=int, default=0, - help="Number of optimization steps that the best N solutions must not change for the early stopping. By default early stopping is not triggered.", + help=( + "Number of optimization steps that the best N solutions must not change " + "for the early stopping. By default early stopping is not triggered." + ), ) parser.add_argument( "--earlystop_leaders", type=int, default=1, - help="Number N of best solutions that will be checked for the early stopping. Default = 1.", + help=( + "Number N of best solutions that will be checked for the early stopping. " + "Default = 1." + ), ) parser.add_argument( "--timeout", type=int, default=60, - help="Timeout in sec. If a trial takes longer it will be killed. Default = 60.", + help=( + "Timeout in sec. If a trial takes longer it will be killed. Default = 60." + ), ) parser.add_argument( @@ -96,7 +117,10 @@ def launch_optimizer(): "--jobs", type=int, default=1, - help="Number of processes that will be launched in parallel during the optimization. Default = 1.", + help=( + "Number of processes that will be launched in parallel during the " + "optimization. Default = 1." + ), ) parser.add_argument( "-m", @@ -104,7 +128,9 @@ def launch_optimizer(): type=str, default="SVR", choices=["SVR", "SVC", "RFR", "RFC", "XGBR", "XGBC"], - help="ML algorithm to be used for optimization. Only one can be used at a time.", + help=( + "ML algorithm to be used for optimization. Only one can be used at a time." + ), ) # parser.add_argument('--multi', action='store_true') parser.add_argument( @@ -131,9 +157,8 @@ def launch_optimizer(): if os.path.exists(outdir): print( - "The output directory {} already exists. The data may be overwritten".format( - outdir - ) + "The output directory {} already exists. The data may be " + "overwritten".format(outdir) ) else: os.makedirs(outdir) diff --git a/doptools/cli/launch_preparer.py b/doptools/cli/launch_preparer.py index ecf6557..0f0f0fa 100644 --- a/doptools/cli/launch_preparer.py +++ b/doptools/cli/launch_preparer.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +# flake8: noqa # # Copyright 2022-2025 Pavel Sidorov This # file is part of DOPTools repository. @@ -23,8 +24,8 @@ import multiprocessing as mp import os import pickle -import warnings -from itertools import combinations, product +from itertools import product +from typing import Any, Dict, Iterable, List, Tuple import numpy as np import pandas as pd @@ -34,7 +35,12 @@ from doptools.chem.chem_features import ComplexFragmentor, PassThrough from doptools.chem.solvents import SolventVectorizer from doptools.optimizer.config import get_raw_calculator -from doptools.optimizer.preparer import * +from doptools.optimizer.preparer import ( + calculate_and_output, + check_parameters, + create_input, + create_output_dir, +) logging.basicConfig( format="{asctime} - {levelname} - {message}", @@ -42,7 +48,7 @@ datefmt="%Y-%m-%d %H:%M", ) -basic_params = { +basic_params: Dict[str, Any] = { "circus": True, "circus_min": [0], "circus_max": [2, 3, 4], @@ -76,7 +82,9 @@ } -def _calculate_and_output(input_params): +def _calculate_and_output( + input_params: Tuple[Any, pd.DataFrame, np.ndarray, str, str, bool, str] +) -> None: calculator, data, prop, prop_name, output_folder, pickles, fmt = input_params desc = calculator.fit_transform(data) @@ -112,8 +120,8 @@ def _calculate_and_output(input_params): ) -def _perform_fullconfig(fullconfig): - calculators = {} +def _perform_fullconfig(fullconfig: Dict[str, Any]) -> None: + calculators: Dict[str, Any] = {} if fullconfig["input_file"].endswith(".csv"): data = pd.read_table(fullconfig["input_file"], sep=",") @@ -129,17 +137,18 @@ def _perform_fullconfig(fullconfig): for m in struct: try: m.canonicalize(fix_tautomers=False) - except: + except Exception: m.canonicalize(fix_tautomers=False) data[s] = [str(m) for m in struct] - y = data[fullconfig["property"]] + property_col = fullconfig["property"] + y = data[property_col] indices = y[pd.notnull(y)].index if len(indices) < len(data): print( - f"'{p}' column warning: only {len(indices)} out of {len(data)} instances have the property." + f"'{property_col}' column warning: only {len(indices)} out of {len(data)} instances have the property." ) - print(f"Molecules that don't have the property will be discarded from the set.") + print("Molecules that don't have the property will be discarded from the set.") y = y.iloc[indices] data = data.iloc[indices] @@ -152,7 +161,7 @@ def _perform_fullconfig(fullconfig): fullconfig["separate_folders"] = False - associators = [] + associators: List[List[Tuple[str, Any]]] = [] for s in fullconfig["structures"].keys(): associators.append([]) for t, d in fullconfig["structures"][s].items(): @@ -170,9 +179,10 @@ def _perform_fullconfig(fullconfig): if "numerical" in fullconfig.keys(): associators.append([("numerical", PassThrough(fullconfig["numerical"]))]) - for p in product(*associators): + for assoc in product(*associators): cf = ComplexFragmentor( - associator=p, structure_columns=list(fullconfig["structures"].keys()) + associator=assoc, + structure_columns=list(fullconfig["structures"].keys()), ) calculators[cf.short_name] = cf else: @@ -210,18 +220,18 @@ def _perform_fullconfig(fullconfig): pool.join() # Wait for all the tasks to complete -def _set_default(argument, default_values): +def _set_default(argument: List[Any], default_values: List[Any]) -> List[Any]: if len(argument) > 0: return list(set(argument)) else: return default_values -def _enumerate_parameters(args): - def _make_name(iterable): +def _enumerate_parameters(args: Any) -> Dict[str, Dict[str, Any]]: + def _make_name(iterable: Iterable[Any]) -> str: return "_".join([str(i) for i in iterable]) - param_dict = {} + param_dict: Dict[str, Dict[str, Any]] = {} if args.morgan: for nb in _set_default(args.morgan_nBits, [1024]): for mr in _set_default(args.morgan_radius, [2]): @@ -288,13 +298,15 @@ def _make_name(iterable): return param_dict -def _pickle_descriptors(output_dir, fragmentor, prop_name, desc_name): +def _pickle_descriptors( + output_dir: str, fragmentor: Any, prop_name: str, desc_name: str +) -> None: fragmentor_name = os.path.join(output_dir, ".".join([prop_name, desc_name, "pkl"])) with open(fragmentor_name, "wb") as f: pickle.dump(fragmentor, f, pickle.HIGHEST_PROTOCOL) -def launch_preparer(): +def launch_preparer() -> None: parser = argparse.ArgumentParser( prog="Descriptor calculator", description="Prepares the descriptor files for hyperparameter optimization launch.", diff --git a/doptools/cli/plotter.py b/doptools/cli/plotter.py index c34dfa1..a05968b 100644 --- a/doptools/cli/plotter.py +++ b/doptools/cli/plotter.py @@ -19,10 +19,13 @@ import argparse import warnings +from typing import Any, Dict, Tuple import matplotlib.pyplot as plt import numpy as np import pandas as pd +from matplotlib.axes import Axes +from matplotlib.figure import Figure from sklearn.metrics import auc from sklearn.metrics import mean_absolute_error as mae from sklearn.metrics import roc_curve @@ -33,7 +36,9 @@ warnings.simplefilter(action="ignore", category=DeprecationWarning) -def make_regression_plot(predictions, errorbar=False, stats=False, title=""): +def make_regression_plot( + predictions: str, errorbar: bool = False, stats: bool = False, title: str = "" +) -> Tuple[Figure, Axes]: fig, ax = plt.subplots(figsize=(4, 4), dpi=300, facecolor="white") @@ -72,7 +77,9 @@ def make_regression_plot(predictions, errorbar=False, stats=False, title=""): return fig, ax -def prepare_classification_plot(cv_res, pos_class=1): +def prepare_classification_plot( + cv_res: pd.DataFrame, pos_class: int = 1 +) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]: prop_name = cv_res.columns[1].split(".")[0] true_val = cv_res[prop_name + ".observed"].values pos_label = [ @@ -121,7 +128,9 @@ def prepare_classification_plot(cv_res, pos_class=1): return roc_repeats, roc_mean -def make_classification_plot(predictions, class_number, **params): +def make_classification_plot( + predictions: str, class_number: int, **params: Any +) -> Tuple[Figure, Axes]: cv_res = pd.read_table(predictions, sep=" ") roc_repeats, roc_mean = prepare_classification_plot(cv_res, class_number) fig, ax = plt.subplots(figsize=(5, 5), dpi=300, facecolor="w") @@ -166,7 +175,7 @@ def make_classification_plot(predictions, class_number, **params): return fig, ax -def plotter(): +def plotter() -> None: parser = argparse.ArgumentParser( prog="Model CV plotter", description="Plot out the CV results of the optimizer" ) diff --git a/doptools/cli/rebuilder.py b/doptools/cli/rebuilder.py index 35353f6..172bd4d 100644 --- a/doptools/cli/rebuilder.py +++ b/doptools/cli/rebuilder.py @@ -18,58 +18,49 @@ # along with this program; if not, see . import argparse -import glob import os import pickle import warnings -from typing import Dict, Iterable, List, Optional, Tuple +from datetime import datetime +from typing import Any, Iterable, List, Optional, Tuple import pandas as pd from sklearn.feature_selection import VarianceThreshold from sklearn.pipeline import Pipeline from sklearn.preprocessing import MinMaxScaler +from doptools.chem.chem_features import ComplexFragmentor +from doptools.estimators.consensus import ConsensusModel from doptools.optimizer.config import get_raw_model warnings.simplefilter(action="ignore", category=FutureWarning) warnings.simplefilter(action="ignore", category=DeprecationWarning) -import argparse -import glob -import os -import pickle -from typing import Dict, Iterable, List, Optional, Tuple - -import pandas as pd - -from doptools.optimizer.config import get_raw_model - - class Rebuilder: def __init__( self, - file: str = None, - folders: List[str] = None, - desc_folder: str = None, + file: Optional[str] = None, + folders: Optional[List[str]] = None, + desc_folder: Optional[str] = None, ensemble: int = 1, - score_threshold=0.5, - ): - self.file = file - self.folders = folders - self.desc_folder = desc_folder + score_threshold: float = 0.5, + ) -> None: + self.file: Optional[str] = file + self.folders: Optional[List[str]] = folders + self.desc_folder: Optional[str] = desc_folder if self.file is None and self.folders is None: raise ValueError( "At least one file or folder should be given to rebuild models" ) - self.ensemble = ensemble - self.score_threshold = score_threshold - self.prop = "" - self.model = None - self.trained = False - - def gather_trials(self, trials="all"): - trial_files = [] + self.ensemble: int = ensemble + self.score_threshold: float = score_threshold + self.prop: str = "" + self.model: Optional[Any] = None + self.trained: bool = False + + def gather_trials(self, trials: str = "all") -> pd.DataFrame: + trial_files: List[str] = [] if self.folders is not None: for f in self.folders: trial_files.append(os.path.join(f, "trials." + trials)) @@ -88,11 +79,13 @@ def gather_trials(self, trials="all"): ) return full_df - def rebuild(self, one_per_descriptor=False): + def rebuild(self, one_per_descriptor: bool = False) -> None: + if self.desc_folder is None: + raise ValueError("desc_folder must be provided to rebuild models.") trials = self.gather_trials() trials = trials.sort_values(by="score", ascending=False) - models = [] - selected_descs = [] + models: List[Any] = [] + selected_descs: List[str] = [] for i, row in trials.iterrows(): if len(models) >= self.ensemble: @@ -142,7 +135,12 @@ def rebuild(self, one_per_descriptor=False): else: self.model = ConsensusModel(models) - def train(self, train_set, train_prop, smiles_column=None): + def train( + self, + train_set: Any, + train_prop: Any, + smiles_column: Optional[str] = None, + ) -> None: if self.model is None: raise AttributeError( "The model has not been created yet. Use rebuild function first." @@ -153,7 +151,12 @@ def train(self, train_set, train_prop, smiles_column=None): train_data = pd.read_excel(train_set) elif train_set.endswith("csv"): train_data = pd.read_table(train_set) - if smiles_column is not None or isinstance(models[0][0], ComplexFragmentor): + descriptor = ( + self.model.pipelines[0][0] + if isinstance(self.model, ConsensusModel) + else self.model[0] + ) + if smiles_column is not None or isinstance(descriptor, ComplexFragmentor): x_train = train_data[smiles_column] else: x_train = train_data @@ -163,11 +166,12 @@ def train(self, train_set, train_prop, smiles_column=None): self.model.fit(x_train, train_prop) self.trained = True - def save_model(self, save_dest): + def save_model(self, save_dest: str, trained: Optional[bool] = None) -> None: + if trained is not None: + self.trained = trained if not os.path.exists(save_dest): - os.makedirs( - save_dest, exist_ok=True - ) # exist_ok is useful when several processes try to create the folder at the same time + os.makedirs(save_dest, exist_ok=True) + # exist_ok helps when several processes try to create the folder at once print("The output directory {} created".format(save_dest)) if self.model is None: raise AttributeError( @@ -195,13 +199,22 @@ def save_model(self, save_dest): with open(os.path.join(save_dest, filename), "wb") as f: pickle.dump(self.model, f, pickle.HIGHEST_PROTOCOL) - def apply(self, test_set, smiles_column=None): + def apply(self, test_set: Any, smiles_column: Optional[str] = None) -> Any: + if self.model is None: + raise AttributeError( + "The model has not been created yet. Use rebuild function first." + ) if isinstance(test_set, str): if test_set.endswith("xlsx") or test_set.endswith("xls"): test_data = pd.read_excel(test_set) elif test_set.endswith("csv"): test_data = pd.read_table(test_set) - if smiles_column is not None or isinstance(models[0][0], ComplexFragmentor): + descriptor = ( + self.model.pipelines[0][0] + if isinstance(self.model, ConsensusModel) + else self.model[0] + ) + if smiles_column is not None or isinstance(descriptor, ComplexFragmentor): x_test = test_data[smiles_column] else: x_test = test_data @@ -210,50 +223,64 @@ def apply(self, test_set, smiles_column=None): results = self.model.predict(x_test) return results - def rebuild_save(self, save_dest, one_per_descriptor=False): + def rebuild_save(self, save_dest: str, one_per_descriptor: bool = False) -> None: self.rebuild(one_per_descriptor) self.save_model(save_dest) def rebuild_train_save( self, - save_dest, - train_set, - train_prop, - smiles_column=None, - one_per_descriptor=False, - ): + save_dest: str, + train_set: Any, + train_prop: Any, + smiles_column: Optional[str] = None, + one_per_descriptor: bool = False, + ) -> None: self.rebuild(one_per_descriptor) self.train(train_set, train_prop, smiles_column) self.save_model(save_dest, trained=True) def rebuild_train_apply( self, - train_set, - train_prop, - test_set, - smiles_column=None, - one_per_descriptor=False, - ): + train_set: Any, + train_prop: Any, + test_set: Any, + smiles_column: Optional[str] = None, + one_per_descriptor: bool = False, + ) -> Any: self.rebuild(one_per_descriptor) self.train(train_set, train_prop, smiles_column) results = self.apply(test_set, smiles_column) return results - def save_self(self, save_dest): + def save_self(self, save_dest: str) -> None: with open(save_dest, "wb") as f: pickle.dump(self, f, pickle.HIGHEST_PROTOCOL) -def rebuilder(): +def rebuild_from_file( + descdir: str, modeldir: str, number: int +) -> Tuple[Any, dict[str, Any]]: + raise NotImplementedError( + "rebuild_from_file is not implemented. Use Rebuilder.rebuild for now." + ) + + +def rebuilder() -> None: parser = argparse.ArgumentParser( prog="Optimized model rebuilder", - description="Rebuilds the model from the optimized trial parameters,\nsaving it as an UNTRAINED pipeline in pickle", + description=( + "Rebuilds the model from the optimized trial parameters, saving it as " + "an UNTRAINED pipeline in pickle" + ), ) parser.add_argument( "-d", "--descdir", required=True, - help="the folder containing descriptor files and calculators. Can contain folders separated by descriptor type", + help=( + "the folder containing descriptor files and calculators. Can contain " + "folders separated by descriptor type" + ), ) parser.add_argument( "-f", "--fileinput", help='the "trials.all" or "trails.best" file.' @@ -261,7 +288,10 @@ def rebuilder(): parser.add_argument( "-m", "--modeldir", - help='the folder containing model output files. Should contain "trials.all" file.', + help=( + 'the folder containing model output files. Should contain "trials.all" ' + "file." + ), ) parser.add_argument( "-o", "--outdir", required=True, help="the output folder for the models." @@ -271,13 +301,19 @@ def rebuilder(): "--ensemble", type=int, deafult=1, - help="the number of models that would be taken for an ensemble. Default 1 (non-ensemble).", + help=( + "the number of models that would be taken for an ensemble. Default 1 " + "(non-ensemble)." + ), ) parser.add_argument( "-e", "--ensemble", action="store_true", - help="toggle to indicate that only one model per descriptor type is taken into ensemble", + help=( + "toggle to indicate that only one model per descriptor type is taken " + "into ensemble" + ), ) args = parser.parse_args() @@ -288,9 +324,8 @@ def rebuilder(): if os.path.exists(outdir): print( - "The output directory {} already exists. The data may be overwritten".format( - outdir - ) + "The output directory {} already exists. The data may be " + "overwritten".format(outdir) ) else: os.makedirs(outdir) diff --git a/doptools/estimators/ad_estimators.py b/doptools/estimators/ad_estimators.py index 601841f..a64e3e1 100644 --- a/doptools/estimators/ad_estimators.py +++ b/doptools/estimators/ad_estimators.py @@ -1,32 +1,35 @@ from copy import deepcopy +from typing import Any, Iterable, List, Optional, Union +import pandas as pd from pandas import DataFrame -from sklearn.base import BaseEstimator, OutlierMixin, clone +from sklearn.base import BaseEstimator, OutlierMixin +from sklearn.datasets import load_svmlight_file from sklearn.utils.validation import check_is_fitted -import pandas as pd - class FragmentControl(BaseEstimator, OutlierMixin): - def __init__(self, pipeline): - self.pipeline = pipeline - self.fragmentor = deepcopy(pipeline[0]) - self.feature_names = [] + def __init__(self, pipeline: Any) -> None: + self.pipeline: Any = pipeline + self.fragmentor: Any = deepcopy(pipeline[0]) + self.feature_names: List[str] = [] try: check_is_fitted(self.pipeline) self.feature_names = pipeline[0].get_feature_names() - except: + except Exception: print("The pipeline is not fitted, you should fit it.") - def fit(self, X, y=None): + def fit(self, X: Any, y: Optional[Iterable[Any]] = None) -> "FragmentControl": self.pipeline.fit(X, y) self.fragmentor = deepcopy(self.pipeline[0]) self.feature_names = self.pipeline[0].get_feature_names() self.is_fitted_ = True return self - def predict(self, X, y=None): - res = [] + def predict( + self, X: Union[DataFrame, List[Any]], y: Optional[Iterable[Any]] = None + ) -> List[int]: + res: List[int] = [] for i in range(len(X)): if isinstance(X, DataFrame): x = X.iloc[i] @@ -42,11 +45,16 @@ def predict(self, X, y=None): class BoundingBox(BaseEstimator, OutlierMixin): - def __init__(self, pipeline): - self.pipeline = pipeline - self.fragmentor = deepcopy(pipeline[0]) + def __init__(self, pipeline: Any) -> None: + self.pipeline: Any = pipeline + self.fragmentor: Any = deepcopy(pipeline[0]) - def fit(self, X, y=None, svm_file=None): + def fit( + self, + X: Any, + y: Optional[Iterable[Any]] = None, + svm_file: Optional[str] = None, + ) -> "BoundingBox": self.is_fitted_ = True if svm_file is not None: d, _ = load_svmlight_file(svm_file) @@ -57,8 +65,10 @@ def fit(self, X, y=None, svm_file=None): self.max_limits = descs.max(axis=0) return self - def predict(self, X, y=None): - res = [] + def predict( + self, X: Union[DataFrame, List[Any]], y: Optional[Iterable[Any]] = None + ) -> List[int]: + res: List[int] = [] for i in range(len(X)): if isinstance(X, DataFrame): x = X.iloc[i] @@ -77,23 +87,29 @@ def predict(self, X, y=None): class PipelineWithAD(BaseEstimator): - def __init__(self, pipeline, ad_type, threshold=None): - self.ad_type = ad_type - self.pipeline = pipeline - self.threshold = threshold + def __init__( + self, pipeline: Any, ad_type: str, threshold: Optional[float] = None + ) -> None: + self.ad_type: str = ad_type + self.pipeline: Any = pipeline + self.threshold: Optional[float] = threshold if self.ad_type == "FragmentControl": self.ad_estimator = FragmentControl(self.pipeline) elif self.ad_type == "BoundingBox": - self.ad_estimator = BoudingBox(self.pipeline) + self.ad_estimator = BoudingBox( # type: ignore[name-defined] # noqa: F821 + self.pipeline + ) - def fit(self, X, y=None): + def fit(self, X: Any, y: Optional[Iterable[Any]] = None) -> "PipelineWithAD": self.is_fitted_ = True self.pipeline.fit(X, y) self.ad_estimator.fit(X, y) return self - def predict(self, X, y=None): - res = [] + def predict( + self, X: Union[DataFrame, List[Any]], y: Optional[Iterable[Any]] = None + ) -> DataFrame: + res: List[tuple[Any, Any]] = [] for i in range(len(X)): if isinstance(X, DataFrame): x = X.iloc[i] @@ -102,9 +118,11 @@ def predict(self, X, y=None): res.append((self.pipeline.predict(x)[0], self.ad_estimator.predict(x)[0])) return pd.DataFrame(res, columns=["Predicted", "AD"]) - def predict_within_AD(self, X, y=None): - res = [] - indices = [] + def predict_within_AD( + self, X: Union[DataFrame, List[Any]], y: Optional[Iterable[Any]] = None + ) -> DataFrame: + res: List[Any] = [] + indices: List[int] = [] for i in range(len(X)): if isinstance(X, DataFrame): x = X.iloc[i] diff --git a/doptools/estimators/consensus.py b/doptools/estimators/consensus.py index 62fa410..e5ce78f 100644 --- a/doptools/estimators/consensus.py +++ b/doptools/estimators/consensus.py @@ -1,7 +1,7 @@ -from typing import Tuple +from typing import Any, Iterable, List, Optional -import pandas as pd import numpy as np +import pandas as pd from sklearn import base from sklearn.base import BaseEstimator @@ -9,12 +9,12 @@ class ConsensusModel(BaseEstimator): - def __init__(self, pipelines): - self.model_type = "R" - self.ad_type = None - if isinstance(pipelines[0], Tuple): - self.names = [p[0] for p in pipelines] - self.pipelines = [p[1] for p in pipelines] + def __init__(self, pipelines: List[Any]) -> None: + self.model_type: str = "R" + self.ad_type: Optional[str] = None + if isinstance(pipelines[0], tuple): + self.names: List[str] = [p[0] for p in pipelines] + self.pipelines: List[Any] = [p[1] for p in pipelines] else: self.names = ["model" + str(i + 1) for i in range(len(pipelines))] self.pipelines = pipelines @@ -29,14 +29,19 @@ def __init__(self, pipelines): if issubclass(self.pipelines[0][-1].__class__, base.ClassifierMixin): self.model_type = "C" - def fit(self, X, y=None): + def fit(self, X: Any, y: Optional[Iterable[Any]] = None) -> "ConsensusModel": for p in self.pipelines: p.fit(X, y) self.is_fitted_ = True return self - def predict(self, X, y=None, output="all"): - preds = [] + def predict( + self, + X: Any, + y: Optional[Iterable[Any]] = None, + output: str = "all", + ) -> pd.DataFrame: + preds: List[Any] = [] if self.ad_type is None: preds = np.array([p.predict(X) for p in self.pipelines]).T @@ -68,7 +73,9 @@ def predict(self, X, y=None, output="all"): elif output == "preds": return res[self.names] - def predict_within_AD(self, X, y=None, output="all"): + def predict_within_AD( + self, X: Any, y: Optional[Iterable[Any]] = None, output: str = "all" + ) -> pd.DataFrame: if self.ad_type is None: return self.predict(X, y, output) else: diff --git a/doptools/optimizer/__init__.py b/doptools/optimizer/__init__.py index 9fecfd1..f71a746 100644 --- a/doptools/optimizer/__init__.py +++ b/doptools/optimizer/__init__.py @@ -16,6 +16,6 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, see . -from .config import * -from .optimizer import * -from .preparer import * +from .config import * # noqa: F401,F403 +from .optimizer import * # noqa: F401,F403 +from .preparer import * # noqa: F401,F403 diff --git a/doptools/optimizer/config.py b/doptools/optimizer/config.py index 1ee70f5..4d4d509 100644 --- a/doptools/optimizer/config.py +++ b/doptools/optimizer/config.py @@ -16,11 +16,13 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, see . -from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor -from sklearn.svm import SVC, SVR -from xgboost import XGBClassifier, XGBRegressor +from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor # noqa: F401 +from sklearn.svm import SVC, SVR # noqa: F401 +from xgboost import XGBClassifier, XGBRegressor # noqa: F401 -from doptools.chem.chem_features import ChythonCircus, ChythonLinear, Fingerprinter +from doptools.chem.chem_features import ChythonCircus # noqa: F401 +from doptools.chem.chem_features import ChythonLinear # noqa: F401 +from doptools.chem.chem_features import Fingerprinter # noqa: F401 methods = { "SVR": "SVR(**params, gamma='auto')", @@ -35,14 +37,14 @@ "circus": "ChythonCircus(**descriptor_params)", "chyline": "ChythonLinear(**descriptor_params)", "morgan": "Fingerprinter(fp_type='morgan', **descriptor_params)", - "morganfeatures": "Fingerprinter(fp_type='morgan', params={'useFeatures':True}, **descriptor_params)", + "morganfeatures": "Fingerprinter(fp_type='morgan', params={'useFeatures':True}, **descriptor_params)", # noqa: E501 "rdkfp": "Fingerprinter(fp_type='rdkfp', **descriptor_params)", - "rdkfplinear": "Fingerprinter(fp_type='rdkfp', params={'branchedPaths':False}, **descriptor_params)", + "rdkfplinear": "Fingerprinter(fp_type='rdkfp', params={'branchedPaths':False}, **descriptor_params)", # noqa: E501 "layered": "Fingerprinter(fp_type='layered', **descriptor_params)", "atompairs": "Fingerprinter(fp_type='atompairs', **descriptor_params)", "avalon": "Fingerprinter(fp_type='avalon', **descriptor_params)", "torsion": "Fingerprinter(fp_type='torsion', **descriptor_params)", - #'mordred2d': "Mordred2DCalculator(**descriptor_params)", + # 'mordred2d': "Mordred2DCalculator(**descriptor_params)", } diff --git a/doptools/optimizer/optimizer.py b/doptools/optimizer/optimizer.py index 0eea8eb..0989985 100644 --- a/doptools/optimizer/optimizer.py +++ b/doptools/optimizer/optimizer.py @@ -23,12 +23,13 @@ import os import warnings from functools import partial -from multiprocessing import Manager +from typing import Any, Dict, MutableMapping, Optional, Tuple, Union import numpy as np import optuna import pandas as pd from optuna.study import StudyDirection +from pandas import DataFrame from scipy.sparse import issparse from sklearn.datasets import load_svmlight_file from sklearn.feature_selection import VarianceThreshold @@ -55,11 +56,11 @@ class TopNPatienceCallback: - def __init__(self, patience: int, leaders: int = 1): - self.patience = patience - self.leaders = leaders - self._leaders_unchanged_steps = 0 - self._previous_leaders = () + def __init__(self, patience: int, leaders: int = 1) -> None: + self.patience: int = patience + self.leaders: int = leaders + self._leaders_unchanged_steps: int = 0 + self._previous_leaders: Tuple[int, ...] = () def __call__( self, study: optuna.study.Study, trial: optuna.trial.FrozenTrial @@ -92,19 +93,20 @@ def __call__( study.stop() -def collect_data(datadir, task, fmt="svm"): - desc_dict = {} - y = {} +def collect_data( + datadir: str, task: str, fmt: str = "svm" +) -> Tuple[Dict[str, Any], DataFrame]: + desc_dict: Dict[str, Any] = {} + y: Dict[str, Any] = {} for f in glob.glob(os.path.join(datadir, "*." + fmt)): propname = f.split(os.sep)[-1].split(".")[0] name = f.split(os.sep)[-1][len(propname) + 1 : -4] - fullname = f.split(os.sep)[-1] if fmt == "svm": desc_dict[name], y[propname] = load_svmlight_file(f) elif fmt == "csv": data = pd.read_table(f) y[propname] = data[propname] - col_idx = list(data.columns).index() + col_idx = list(data.columns).index(propname) desc_dict[name] = data.iloc[:, col_idx + 1 :] if task.endswith("C"): return desc_dict, pd.DataFrame(y, dtype=int) @@ -112,8 +114,10 @@ def collect_data(datadir, task, fmt="svm"): return desc_dict, pd.DataFrame(y) -def calculate_scores(task, obs, pred): - def create_row(task, stat_name, x, y): +def calculate_scores(task: str, obs: DataFrame, pred: DataFrame) -> DataFrame: + def create_row( + task: str, stat_name: str, x: pd.Series, y: pd.Series + ) -> Dict[str, Union[str, float]]: if task == "R": return { "stat": stat_name, @@ -143,6 +147,7 @@ def create_row(task, stat_name, x, y): "F1": f1_score(x, y, average="macro"), "MCC": matthews_corrcoef(x, y), } + raise ValueError("Unknown task type") if task == "R": score_df = pd.DataFrame(columns=["stat", "R2", "RMSE", "MAE"]) @@ -173,21 +178,21 @@ def create_row(task, stat_name, x, y): def objective_study( - storage, - results_detailed, - trial, - x_dict, - y, - outdir, - method, - ntrials, - cv_splits, - cv_repeats, - jobs, - tmout, - earlystop, + storage: MutableMapping[int, Dict[str, Any]], + results_detailed: MutableMapping[int, Dict[str, Any]], + trial: optuna.trial.Trial, + x_dict: Dict[str, Any], + y: DataFrame, + outdir: str, + method: str, + ntrials: int, + cv_splits: int, + cv_repeats: int, + jobs: int, + tmout: int, + earlystop: Tuple[int, int], write_output: bool = True, -): +) -> float: n = trial.number if write_output and not os.path.exists(os.path.join(outdir, "trial." + str(n))): os.mkdir(os.path.join(outdir, "trial." + str(n))) @@ -208,8 +213,6 @@ def objective_study( X = VarianceThreshold().fit_transform(X) params = suggest_params(trial, method) - # storage[n] = {"fit_score":fscore, 'desc': desc, 'scaling': scaling, 'method': method, **params} - model = get_raw_model(method, params) Y = np.array(y[y.columns[0]]) @@ -248,7 +251,7 @@ def objective_study( score_df = calculate_scores(method[-1], y, res_pd) - fit_scores = {} + fit_scores: Dict[str, Union[str, float]] = {} model.fit(X, Y) fit_preds = model.predict(X) if method.endswith("R"): @@ -329,21 +332,21 @@ def objective_study( def run_objective_study_with_timeout( - storage, - results_detailed, - x_dict, - y, - outdir, - method, - ntrials, - cv_splits, - cv_repeats, - jobs, - tmout, - earlystop, - write_output, - trial, -): + storage: MutableMapping[int, Dict[str, Any]], + results_detailed: MutableMapping[int, Dict[str, Any]], + x_dict: Dict[str, Any], + y: DataFrame, + outdir: str, + method: str, + ntrials: int, + cv_splits: int, + cv_repeats: int, + jobs: int, + tmout: int, + earlystop: Tuple[int, int], + write_output: bool, + trial: optuna.trial.Trial, +) -> float: timeouted_objective = timeout_decorator.timeout( tmout, timeout_exception=optuna.TrialPruned, use_signals=False )(objective_study) @@ -366,22 +369,22 @@ def run_objective_study_with_timeout( def launch_study( - x_dict, - y, - outdir, - method, - ntrials, - cv_splits, - cv_repeats, - jobs, - tmout, - earlystop, + x_dict: Dict[str, Any], + y: DataFrame, + outdir: str, + method: str, + ntrials: int, + cv_splits: int, + cv_repeats: int, + jobs: int, + tmout: int, + earlystop: Tuple[int, int], write_output: bool = True, -): +) -> Optional[Tuple[DataFrame, Dict[int, Any]]]: ctx = mp.get_context() with ctx.Manager() as manager: - results_dict = manager.dict() - results_detailed = manager.dict() + results_dict: MutableMapping[int, Dict[str, Any]] = manager.dict() + results_detailed: MutableMapping[int, Dict[str, Any]] = manager.dict() study = optuna.create_study( direction="maximize", sampler=optuna.samplers.TPESampler() @@ -414,25 +417,25 @@ def launch_study( **kwargs_opt ) - results_dict = dict(results_dict) - results_detailed = dict(results_detailed) + results_dict_local = dict(results_dict) + results_detailed_local = dict(results_detailed) - hyperparam_names = list(results_dict[next(iter(results_dict))].keys()) + hyperparam_names = list(results_dict_local[next(iter(results_dict_local))].keys()) results_pd = pd.DataFrame(columns=["trial"] + hyperparam_names + ["score"]) intermediate = study.trials_dataframe(attrs=("number", "value")) for i, row in intermediate.iterrows(): number = int(row.number) - if number not in results_dict: + if number not in results_dict_local: continue added_row = { "trial": number, "score": row.value, - "fit_score": results_dict[number]["fit_score"], + "fit_score": results_dict_local[number]["fit_score"], } for hp in hyperparam_names: - added_row[hp] = results_dict[number][hp] + added_row[hp] = results_dict_local[number][hp] results_pd = pd.concat( [pd.DataFrame([added_row]), results_pd.loc[:]] @@ -444,7 +447,8 @@ def launch_study( os.path.join(outdir, "trials.best"), sep=" ", index=False ) else: - return results_pd, results_detailed + return results_pd, results_detailed_local + return None __all__ = ["calculate_scores", "collect_data", "launch_study"] diff --git a/doptools/optimizer/preparer.py b/doptools/optimizer/preparer.py index 2c231be..d2ae87b 100644 --- a/doptools/optimizer/preparer.py +++ b/doptools/optimizer/preparer.py @@ -16,12 +16,11 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, see . -import argparse -import json -import multiprocessing as mp + import os import pickle import warnings +from typing import Any, Dict, Iterable, List, Tuple import numpy as np import pandas as pd @@ -36,18 +35,18 @@ warnings.simplefilter(action="ignore", category=DeprecationWarning) -def _set_default(argument, default_values): +def _set_default(argument: List[Any], default_values: List[Any]) -> List[Any]: if len(argument) > 0: return list(set(argument)) else: return default_values -def _enumerate_parameters(args): - def _make_name(iterable): +def _enumerate_parameters(args: Any) -> Dict[str, Dict[str, Any]]: + def _make_name(iterable: Iterable[Any]) -> str: return "_".join([str(i) for i in iterable]) - param_dict = {} + param_dict: Dict[str, Dict[str, Any]] = {} if args.morgan: for nb in _set_default(args.morgan_nBits, [1024]): for mr in _set_default(args.morgan_radius, [2]): @@ -114,13 +113,15 @@ def _make_name(iterable): return param_dict -def _pickle_descriptors(output_dir, fragmentor, prop_name, desc_name): +def _pickle_descriptors( + output_dir: str, fragmentor: Any, prop_name: str, desc_name: str +) -> None: fragmentor_name = os.path.join(output_dir, ".".join([prop_name, desc_name, "pkl"])) with open(fragmentor_name, "wb") as f: pickle.dump(fragmentor, f, pickle.HIGHEST_PROTOCOL) -def check_parameters(params): +def check_parameters(params: Any) -> None: if not params.input: raise ValueError("No input file.") if params.input.split(".")[-1] not in ("csv", "xls", "xlsx"): @@ -128,18 +129,20 @@ def check_parameters(params): for i, p in enumerate(params.property_col): if " " in p and len(params.property_names) < (i + 1): raise ValueError( - f"Column name {p} contains spaces in the name.\nPlease provide alternative names with --property_names option." + f"Column name {p} contains spaces in the name.\n" + "Please provide alternative names with --property_names option." ) if params.property_names: if len(params.property_col) != len(params.property_names): raise ValueError( - "The number of alternative names is not equal to the number of properties." + "The number of alternative names is not equal to the number of " + "properties." ) -def create_input(input_params): - input_dict = {} - structures = [] +def create_input(input_params: Dict[str, Any]) -> Dict[str, Any]: + input_dict: Dict[str, Any] = {} + structures: List[Any] = [] if input_params["input_file"].endswith("csv"): data_table = pd.read_table(input_params["input_file"], sep=",") @@ -162,7 +165,7 @@ def create_input(input_params): for m in structures: try: m.canonicalize(fix_tautomers=False) - except: + except Exception: m.canonicalize(fix_tautomers=False) input_dict["structures"][col] = structures # input_dict['structures'] = structures @@ -178,10 +181,12 @@ def create_input(input_params): indices = list(y[pd.notnull(y)].index) if len(indices) < len(structures): print( - f"'{p}' column warning: only {len(indices)} out of {len(structures)} instances have the property." + f"'{p}' column warning: only {len(indices)} out of " + f"{len(structures)} instances have the property." ) print( - f"Molecules that don't have the property will be discarded from the set." + "Molecules that don't have the property will be discarded from the " + "set." ) y = y.iloc[indices] y = np.array(y) @@ -199,9 +204,14 @@ def create_input(input_params): return input_dict -def calculate_descriptor_table(input_dict, desc_name, descriptor_params, out="all"): +def calculate_descriptor_table( + input_dict: Dict[str, Any], + desc_name: str, + descriptor_params: Dict[str, Any], + out: str = "all", +) -> Any: desc_type = desc_name.split("_")[0] - result = {"name": desc_name, "type": desc_type} + result: Dict[str, Any] = {"name": desc_name, "type": desc_type} for k, d in input_dict.items(): if k.startswith("prop"): base_column = list(input_dict["structures"].columns)[0] @@ -260,7 +270,9 @@ def calculate_descriptor_table(input_dict, desc_name, descriptor_params, out="al raise ValueError("The return value is not in the result dictionary") -def output_descriptors(calculated_result, output_params): +def output_descriptors( + calculated_result: Dict[str, Any], output_params: Dict[str, Any] +) -> None: desc_name = calculated_result["name"] desc_type = calculated_result["type"] @@ -268,9 +280,8 @@ def output_descriptors(calculated_result, output_params): if output_params["separate"]: output_folder = os.path.join(output_folder, desc_type) if not os.path.exists(output_folder): - os.makedirs( - output_folder, exist_ok=True - ) # exist_ok is useful when several processes try to create the folder at the same time + os.makedirs(output_folder, exist_ok=True) + # exist_ok helps when several processes try to create the folder at once print("The output directory {} created".format(output_folder)) for k, d in calculated_result.items(): if k.startswith("prop"): @@ -299,18 +310,19 @@ def output_descriptors(calculated_result, output_params): ) -def calculate_and_output(input_args): +def calculate_and_output( + input_args: Tuple[Dict[str, Any], str, Dict[str, Any], Dict[str, Any]] +) -> None: inpt, desc, descriptor_params, output_params = input_args result = calculate_descriptor_table(inpt, desc, descriptor_params) output_descriptors(result, output_params) -def create_output_dir(outdir): +def create_output_dir(outdir: str) -> None: if os.path.exists(outdir): print( - "The output directory {} already exists. The data may be overwritten".format( - outdir - ) + "The output directory {} already exists. The data may be " + "overwritten".format(outdir) ) else: os.makedirs(outdir) diff --git a/setup.py b/setup.py index 2571f3d..2c01e2c 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,10 @@ "openpyxl>=3.1", "pillow>=11.2.1", ], - description="A package for calculation of molecular descriptors in Scikit-Learn compatible way and model optimization", + description=( + "A package for calculation of molecular descriptors in Scikit-Learn " + "compatible way and model optimization" + ), long_description=(Path(__file__).parent / "README.rst") .open(encoding="utf-8") .read(), @@ -73,7 +76,8 @@ "Intended Audience :: Developers", "Topic :: Scientific/Engineering :: Chemistry", "Topic :: Software Development :: Libraries :: Python Modules", - "License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)", + "License :: OSI Approved :: GNU Lesser General Public License v3 or later " + "(LGPLv3+)", "Operating System :: OS Independent", "Programming Language :: Python", "Programming Language :: Python :: 3", diff --git a/tests/chem/test_chem_features.py b/tests/chem/test_chem_features.py index d63b78e..7e068e7 100644 --- a/tests/chem/test_chem_features.py +++ b/tests/chem/test_chem_features.py @@ -1,11 +1,16 @@ """Tests for chem_features module.""" -from __future__ import annotations - import pandas as pd import pandas.testing as pdt import pytest +from doptools.chem.chem_features import ( + ChythonCircus, + ChythonLinear, + ComplexFragmentor, + Fingerprinter, + PassThrough, +) from tests.conftest import ( CHEM_CHYLINE_UPPER, CHEM_CIRCUS_UPPER, @@ -15,14 +20,6 @@ CHEM_RDKFP_RADIUS, ) -from doptools.chem.chem_features import ( - ChythonCircus, - ChythonLinear, - ComplexFragmentor, - Fingerprinter, - PassThrough, -) - @pytest.mark.parametrize("upper", CHEM_CIRCUS_UPPER) def test_chython_circus_counts_basic( diff --git a/tests/chem/test_coloratom.py b/tests/chem/test_coloratom.py index bcad5c5..bf19716 100644 --- a/tests/chem/test_coloratom.py +++ b/tests/chem/test_coloratom.py @@ -1,7 +1,5 @@ """Tests for coloratom helpers.""" -from __future__ import annotations - from doptools.chem.coloratom import ColorAtom diff --git a/tests/chem/test_solvents.py b/tests/chem/test_solvents.py index ed59534..d07d4e0 100644 --- a/tests/chem/test_solvents.py +++ b/tests/chem/test_solvents.py @@ -1,7 +1,5 @@ """Tests for solvents module.""" -from __future__ import annotations - import pandas.testing as pdt from doptools.chem.solvents import SolventVectorizer diff --git a/tests/chem/test_utils.py b/tests/chem/test_utils.py index e553388..d5efc93 100644 --- a/tests/chem/test_utils.py +++ b/tests/chem/test_utils.py @@ -1,7 +1,5 @@ """Tests for chem utils module.""" -from __future__ import annotations - from dataclasses import dataclass import pytest diff --git a/tests/cli/test_ensemble_model_rebuilding.py b/tests/cli/test_ensemble_model_rebuilding.py index 8069349..f17e697 100644 --- a/tests/cli/test_ensemble_model_rebuilding.py +++ b/tests/cli/test_ensemble_model_rebuilding.py @@ -1,7 +1,5 @@ """Stub tests for ensemble_model_rebuilding CLI.""" -from __future__ import annotations - import pytest diff --git a/tests/cli/test_launch_optimizer.py b/tests/cli/test_launch_optimizer.py index c629282..5fece42 100644 --- a/tests/cli/test_launch_optimizer.py +++ b/tests/cli/test_launch_optimizer.py @@ -1,10 +1,7 @@ """Tests for launch_optimizer CLI.""" -from __future__ import annotations - -import sys - import importlib +import sys import pandas as pd diff --git a/tests/cli/test_launch_preparer.py b/tests/cli/test_launch_preparer.py index 61dda19..bb8ebdd 100644 --- a/tests/cli/test_launch_preparer.py +++ b/tests/cli/test_launch_preparer.py @@ -1,7 +1,5 @@ """Tests for launch_preparer CLI.""" -from __future__ import annotations - import importlib import sys @@ -10,7 +8,7 @@ class _DummyPool: def __init__(self) -> None: - self.mapped = [] + self.mapped: list[tuple[object, list[object]]] = [] def map(self, func, iterable): self.mapped.append((func, list(iterable))) diff --git a/tests/cli/test_plotter.py b/tests/cli/test_plotter.py index 9f5ebb4..02dd6bb 100644 --- a/tests/cli/test_plotter.py +++ b/tests/cli/test_plotter.py @@ -1,7 +1,5 @@ """Stub tests for plotter CLI.""" -from __future__ import annotations - import pytest diff --git a/tests/cli/test_rebuilder.py b/tests/cli/test_rebuilder.py index a14b3bc..bc9e302 100644 --- a/tests/cli/test_rebuilder.py +++ b/tests/cli/test_rebuilder.py @@ -1,7 +1,5 @@ """Stub tests for rebuilder CLI.""" -from __future__ import annotations - import pytest diff --git a/tests/conftest.py b/tests/conftest.py index 1e61abb..e3b4cd9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,10 @@ """Shared pytest fixtures for doptools tests.""" -from __future__ import annotations - from pathlib import Path from typing import Any import pytest -import yaml +import yaml # type: ignore[import-untyped] _ROOT = Path(__file__).resolve().parent CHEM_DATA_DIR = _ROOT / "data" / "chem" diff --git a/tests/data/chem/generate_expected.py b/tests/data/chem/generate_expected.py index 4bf719b..28eb81d 100644 --- a/tests/data/chem/generate_expected.py +++ b/tests/data/chem/generate_expected.py @@ -1,12 +1,10 @@ """Generate expected descriptor outputs for chem tests.""" -from __future__ import annotations - from pathlib import Path -from typing import Iterable, Any +from typing import Any, Iterable import pandas as pd -import yaml +import yaml # type: ignore[import-untyped] from doptools.chem.chem_features import ( ChythonCircus, @@ -16,7 +14,6 @@ PassThrough, ) - ROOT = Path(__file__).resolve().parent CONFIG_PATH = ROOT / "config.yaml" @@ -31,8 +28,9 @@ def _write_csv(df: Any | pd.DataFrame, path: Path) -> None: df.to_csv(path, index=False) -def _smiles_to_dataframe(smiles: Iterable[str], numeric_values: list[int] - ) -> pd.DataFrame: +def _smiles_to_dataframe( + smiles: Iterable[str], numeric_values: list[int] +) -> pd.DataFrame: return pd.DataFrame({"mol": list(smiles), "num": numeric_values}) @@ -74,9 +72,10 @@ def generate() -> None: for radius in params["rdkfp_radius"]: fragmentor = ComplexFragmentor( associator=[ - ("mol", Fingerprinter(fp_type="rdkfp", - nBits=n_bits, - radius=radius)), + ( + "mol", + Fingerprinter(fp_type="rdkfp", nBits=n_bits, radius=radius), + ), ("numerical", PassThrough(["num"])), ], structure_columns=["mol"], diff --git a/tests/estimators/test_ad_estimators.py b/tests/estimators/test_ad_estimators.py index 3e44835..9128076 100644 --- a/tests/estimators/test_ad_estimators.py +++ b/tests/estimators/test_ad_estimators.py @@ -1,8 +1,5 @@ """Tests for ad_estimators module.""" -from __future__ import annotations - -import pandas as pd import pytest from sklearn.dummy import DummyRegressor from sklearn.pipeline import Pipeline diff --git a/tests/estimators/test_consensus.py b/tests/estimators/test_consensus.py index 420c577..4baba7a 100644 --- a/tests/estimators/test_consensus.py +++ b/tests/estimators/test_consensus.py @@ -1,7 +1,5 @@ """Tests for consensus module.""" -from __future__ import annotations - import pandas as pd from sklearn.dummy import DummyRegressor from sklearn.pipeline import Pipeline diff --git a/tests/optimizer/test_config.py b/tests/optimizer/test_config.py index bd67dc5..5adcee7 100644 --- a/tests/optimizer/test_config.py +++ b/tests/optimizer/test_config.py @@ -1,7 +1,5 @@ """Tests for optimizer config helpers.""" -from __future__ import annotations - from sklearn.svm import SVR from doptools.chem.chem_features import Fingerprinter diff --git a/tests/optimizer/test_optimizer.py b/tests/optimizer/test_optimizer.py index f1e2fc7..0503789 100644 --- a/tests/optimizer/test_optimizer.py +++ b/tests/optimizer/test_optimizer.py @@ -1,7 +1,5 @@ """Tests for optimizer module.""" -from __future__ import annotations - import numpy as np import optuna import pandas as pd diff --git a/tests/optimizer/test_preparer.py b/tests/optimizer/test_preparer.py index 06cf915..0cde911 100644 --- a/tests/optimizer/test_preparer.py +++ b/tests/optimizer/test_preparer.py @@ -1,7 +1,5 @@ """Tests for preparer module.""" -from __future__ import annotations - from types import SimpleNamespace import numpy as np diff --git a/tests/optimizer/test_utils.py b/tests/optimizer/test_utils.py index bacb94e..0639905 100644 --- a/tests/optimizer/test_utils.py +++ b/tests/optimizer/test_utils.py @@ -1,7 +1,5 @@ """Tests for optimizer utils.""" -from __future__ import annotations - import numpy as np import pytest