diff --git a/.isort.cfg b/.isort.cfg
new file mode 100644
index 0000000..ba529b2
--- /dev/null
+++ b/.isort.cfg
@@ -0,0 +1,3 @@
+[settings]
+profile = black
+line_length = 88
diff --git a/doptools/__init__.py b/doptools/__init__.py
index 3a5baa3..e409b00 100644
--- a/doptools/__init__.py
+++ b/doptools/__init__.py
@@ -16,6 +16,6 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, see .
-from .chem import *
-from .cli import *
-from .optimizer import *
+from .chem import * # noqa: F401,F403
+from .cli import * # noqa: F401,F403
+from .optimizer import * # noqa: F401,F403
diff --git a/doptools/chem/__init__.py b/doptools/chem/__init__.py
index 3afa2d4..9e83e5d 100644
--- a/doptools/chem/__init__.py
+++ b/doptools/chem/__init__.py
@@ -16,6 +16,6 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, see .
-from .chem_features import *
-from .coloratom import *
-from .solvents import *
+from .chem_features import * # noqa: F401,F403
+from .coloratom import * # noqa: F401,F403
+from .solvents import * # noqa: F401,F403
diff --git a/doptools/chem/chem_features.py b/doptools/chem/chem_features.py
index 7b1d731..9686d1e 100644
--- a/doptools/chem/chem_features.py
+++ b/doptools/chem/chem_features.py
@@ -16,9 +16,19 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, see .
-from tqdm import tqdm
from functools import partialmethod
-from typing import Dict, Iterable, List, Optional, Tuple
+from typing import (
+ Any,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Protocol,
+ Sequence,
+ Tuple,
+ Union,
+ cast,
+)
from warnings import warn
import numpy as np
@@ -27,8 +37,9 @@
from pandas import DataFrame
from rdkit import Chem, RDLogger
from rdkit.Avalon import pyAvalonTools
-from rdkit.Chem import AllChem, rdMolDescriptors
+from rdkit.Chem import AllChem
from sklearn.base import BaseEstimator, TransformerMixin
+from tqdm import tqdm
# from mordred import Calculator, descriptors
from doptools.chem.utils import _add_stereo_substructure
@@ -47,14 +58,14 @@ class DescriptorCalculator:
features of the calculator.
"""
- def __init__(self, name: str, size: Tuple[int]):
- self._name = name
- self._size = size
- self._short_name = name
- self.feature_names = []
+ def __init__(self, name: str, size: Tuple[int, ...]) -> None:
+ self._name: str = name
+ self._size: Tuple[int, ...] = size
+ self._short_name: str = name
+ self.feature_names: Union[List[str], Dict[int, List[Any]]] = []
@property
- def size(self) -> Tuple[int]:
+ def size(self) -> Tuple[int, ...]:
"""
Returns the size of the calculator as a tuple of integers.
"""
@@ -69,10 +80,10 @@ def name(self) -> str:
return self._name
@property
- def short_name(self):
+ def short_name(self) -> str:
return self._short_name
- def get_feature_names(self) -> List[str]:
+ def get_feature_names(self) -> List[str] | Dict[int, List[Any]]:
"""
Returns the list of features as strings.
"""
@@ -116,8 +127,8 @@ def __init__(
only_dynamic: bool = False,
on_bond: bool = False,
fmt: str = "mol",
- keep_stereo="no",
- ):
+ keep_stereo: str = "no",
+ ) -> None:
"""
Circus descriptor calculator constructor.
@@ -133,21 +144,23 @@ def __init__(
:param on_bond: toggle for calculating fragments centering on bonds.
:type on_bond: bool
- param fmt: format of the molecules for input ('mol' for MoleculeContainers, 'smiles' for strings).
+ param fmt: format of the molecules for input ('mol' for MoleculeContainers,
+ 'smiles' for strings).
:type fmt: str
- param keep_stereo: ("yes", "no", or "both") applicable for reactions to generate stereo-keeping CGR fragments.
+ param keep_stereo: ("yes", "no", or "both") applicable for reactions to
+ generate stereo-keeping CGR fragments.
:type keep_stereo: str
"""
- self.feature_names = []
- self.lower = lower
- self.upper = upper
- self.only_dynamic = only_dynamic
- self.fmt = fmt
- self.on_bond = on_bond
- self._name = "circus"
- self._size = (lower, upper)
- self.keep_stereo = keep_stereo
+ self.feature_names: List[str] = []
+ self.lower: int = lower
+ self.upper: int = upper
+ self.only_dynamic: bool = only_dynamic
+ self.fmt: str = fmt
+ self.on_bond: bool = on_bond
+ self._name: str = "circus"
+ self._size: Tuple[int, ...] = (lower, upper)
+ self.keep_stereo: str = keep_stereo
all_params = ["C", str(lower), str(upper)]
if on_bond:
all_params += ["B"]
@@ -159,7 +172,11 @@ def __init__(
all_params += ["BS"]
self._short_name = "-".join(all_params)
- def fit(self, X: DataFrame, y: Optional[List] = None):
+ def fit(
+ self,
+ X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]],
+ y: Optional[List[Any]] = None,
+ ) -> "ChythonCircus":
"""
Fits the calculator - finds all possible substructures in the
given array of molecules/CGRs.
@@ -177,6 +194,8 @@ def fit(self, X: DataFrame, y: Optional[List] = None):
reac = None
if self.fmt == "smiles":
mol = smiles(mol)
+ mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol)
+ mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol)
if isinstance(mol, ReactionContainer):
reac = mol
mol = reac.compose()
@@ -223,7 +242,11 @@ def fit(self, X: DataFrame, y: Optional[List] = None):
self.feature_names.append(sub_smiles)
return self
- def transform(self, X: Iterable, y: Optional[List] = None) -> DataFrame:
+ def transform(
+ self,
+ X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]],
+ y: Optional[List[Any]] = None,
+ ) -> DataFrame:
"""
Transforms the given array of molecules/CGRs to a data frame
with features and their values.
@@ -238,10 +261,12 @@ def transform(self, X: Iterable, y: Optional[List] = None) -> DataFrame:
"""
table = pd.DataFrame(columns=self.feature_names)
for i, mol in enumerate(X):
- visited_substructures = []
+ visited_substructures: List[set[int]] = []
reac = None
if self.fmt == "smiles":
mol = smiles(mol)
+ mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol)
+ mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol)
if isinstance(mol, ReactionContainer):
reac = mol
mol = reac.compose()
@@ -331,20 +356,24 @@ def __init__(
upper: int = 0,
only_dynamic: bool = False,
fmt: str = "mol",
- ):
- self.feature_names = []
- self.lower = lower
- self.upper = upper
- self.only_dynamic = only_dynamic
- self.fmt = fmt
- self._name = "chyline"
- self._size = (lower, upper)
+ ) -> None:
+ self.feature_names: List[str] = []
+ self.lower: int = lower
+ self.upper: int = upper
+ self.only_dynamic: bool = only_dynamic
+ self.fmt: str = fmt
+ self._name: str = "chyline"
+ self._size: Tuple[int, ...] = (lower, upper)
all_params = ["H", str(lower), str(upper)]
if only_dynamic:
all_params += ["D"]
self._short_name = "-".join(all_params)
- def fit(self, X: DataFrame, y: Optional[List] = None):
+ def fit(
+ self,
+ X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]],
+ y: Optional[List[Any]] = None,
+ ) -> "ChythonLinear":
"""
Fits the calculator - finds all possible substructures in the
given array of molecules/CGRs.
@@ -358,20 +387,25 @@ def fit(self, X: DataFrame, y: Optional[List] = None):
:type y: None
"""
self.feature_names = []
- output = []
+ output: List[Dict[int, Any]] = []
for i, mol in enumerate(X):
if self.fmt == "smiles":
mol = smiles(mol)
+ mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol)
if isinstance(mol, ReactionContainer):
reac = mol
mol = reac.compose()
output.append(
mol.linear_smiles_hash(self.lower, self.upper, number_bit_pairs=0)
)
- self.feature_names = pd.DataFrame(output).columns
+ self.feature_names = list(pd.DataFrame(output).columns)
return self
- def transform(self, X: DataFrame, y: Optional[List] = None):
+ def transform(
+ self,
+ X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]],
+ y: Optional[List[Any]] = None,
+ ) -> DataFrame:
"""
Transforms the given array of molecules/CGRs to a data frame
with features and their values.
@@ -386,20 +420,21 @@ def transform(self, X: DataFrame, y: Optional[List] = None):
"""
df = pd.DataFrame(columns=self.feature_names, dtype=int)
- output = []
+ output: List[Dict[int, Any]] = []
for m in X:
if self.fmt == "smiles":
m = smiles(m)
+ m = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], m)
if isinstance(m, ReactionContainer):
reac = m
m = reac.compose()
output.append(
m.linear_smiles_hash(self.lower, self.upper, number_bit_pairs=0)
)
- output = pd.DataFrame(output)
- output = output.map(lambda x: len(x) if isinstance(x, list) else 0)
+ output_df = pd.DataFrame(output)
+ output_df = output_df.map(lambda x: len(x) if isinstance(x, list) else 0)
- output2 = output[output.columns.intersection(df.columns)]
+ output2 = output_df[output_df.columns.intersection(df.columns)]
df = pd.concat([df, output2])
df = df.fillna(0)
return df
@@ -421,39 +456,43 @@ class Fingerprinter(DescriptorCalculator, BaseEstimator, TransformerMixin):
def __init__(
self,
- fp_type,
+ fp_type: str,
nBits: int = 1024,
- radius=None,
- params=None,
- fmt="mol",
- chirality=False,
- ):
+ radius: Optional[int] = None,
+ params: Optional[Dict[str, Any]] = None,
+ fmt: str = "mol",
+ chirality: bool = False,
+ ) -> None:
if params is None:
params = {}
- self.fp_type = fp_type
- self.nBits = nBits
- self.fmt = fmt
+ self.fp_type: str = fp_type
+ self.nBits: int = nBits
+ self.fmt: str = fmt
if radius is None:
- self._size = (nBits,)
+ self._size: Tuple[int, ...] = (nBits,)
else:
self._size = (radius, nBits)
- self.radius = radius
- self.params = params
- self.chirality = chirality
- self.info = dict([(i, []) for i in range(self.nBits)])
- self.feature_names = dict([(i, []) for i in range(self.nBits)])
- self.feature_names_chython = dict([(i, []) for i in range(self.nBits)])
+ self.radius: Optional[int] = radius
+ self.params: Dict[str, Any] = params
+ self.chirality: bool = chirality
+ self.info: Dict[int, List[Any]] = dict([(i, []) for i in range(self.nBits)])
+ self.feature_names: Dict[int, List[Any]] = dict(
+ [(i, []) for i in range(self.nBits)]
+ )
+ self.feature_names_chython: Dict[int, List[Any]] = dict(
+ [(i, []) for i in range(self.nBits)]
+ )
if (
fp_type == "morgan"
and "useFeatures" in params.keys()
- and params["useFeatures"] == True
+ and params["useFeatures"] is True
):
self._name = "morganfeatures"
self._short_name = "-".join(["MF", str(nBits), str(radius)])
elif (
fp_type == "rdkfp"
and "branchedPaths" in params.keys()
- and params["branchedPaths"] == False
+ and params["branchedPaths"] is False
):
self._name = "rdkfplinear"
self._short_name = "-".join(["RL", str(nBits), str(radius)])
@@ -472,7 +511,11 @@ def __init__(
all_params.append(str(radius))
self._short_name = "-".join(all_params)
- def fit(self, X: DataFrame, y=None):
+ def fit(
+ self,
+ X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]],
+ y: Optional[List[Any]] = None,
+ ) -> "Fingerprinter":
"""
Fits the fingerprint calculator.
@@ -486,8 +529,8 @@ def fit(self, X: DataFrame, y=None):
return self
- def get_features(self, x, output="smiles"):
- features = dict([(i, []) for i in range(self.nBits)])
+ def get_features(self, x: Any, output: str = "smiles") -> Dict[int, Any]:
+ features: Dict[int, Any] = dict([(i, []) for i in range(self.nBits)])
m = Chem.MolFromSmiles(str(x))
if self.fp_type == "avalon":
pass
@@ -502,7 +545,8 @@ def get_features(self, x, output="smiles"):
if not hasattr(self, "chirality"): # Back compatibility
self.chirality = False
warn(
- "Compatibility mode: The pipeline was created with an older version of DOPTools. Consider recreating it"
+ "Compatibility mode: The pipeline was created with an older "
+ "version of DOPTools. Consider recreating it"
)
if "useFeatures" in self.params and self.params["useFeatures"]:
@@ -518,13 +562,13 @@ def get_features(self, x, output="smiles"):
)
ao = AllChem.AdditionalOutput()
ao.CollectBitInfoMap()
- desc = frg.GetFingerprintAsNumPy(m, additionalOutput=ao)
+ frg.GetFingerprintAsNumPy(m, additionalOutput=ao)
bmap = ao.GetBitInfoMap()
for k, v in bmap.items():
for i in v:
if i[1] > 0:
env = Chem.FindAtomEnvironmentOfRadiusN(m, i[1], i[0])
- amap = {}
+ amap: Dict[int, int] = {}
submol = Chem.PathToSubmol(m, env, atomMap=amap)
if output == "smiles":
features[k].append(Chem.MolToSmiles(submol, canonical=True))
@@ -550,7 +594,7 @@ def get_features(self, x, output="smiles"):
)
ao = AllChem.AdditionalOutput()
ao.CollectBitPaths()
- desc = frg.GetFingerprintAsNumPy(m, additionalOutput=ao)
+ frg.GetFingerprintAsNumPy(m, additionalOutput=ao)
bmap = ao.GetBitPaths()
for k, v in bmap.items():
for i in v:
@@ -600,7 +644,11 @@ def get_features(self, x, output="smiles"):
def get_feature_names(self) -> List[str]:
return [str(i) for i in range(self.nBits)]
- def transform(self, X, y=None):
+ def transform(
+ self,
+ X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]],
+ y: Optional[List[Any]] = None,
+ ) -> DataFrame:
"""
Transforms the given array of molecules to a data frame
with features and their values.
@@ -628,7 +676,8 @@ def transform(self, X, y=None):
if not hasattr(self, "chirality"): # Back compatibility
self.chirality = False
warn(
- "Compatibility mode: The pipeline was created with an older version of DOPTools. Consider recreating it"
+ "Compatibility mode: The pipeline was created with an older "
+ "version of DOPTools. Consider recreating it"
)
if self.fp_type == "atompairs":
@@ -666,34 +715,57 @@ def transform(self, X, y=None):
return pd.DataFrame(np.array(res), columns=[str(i) for i in range(self.nBits)])
+class DescriptorLike(Protocol):
+ short_name: str
+
+ def fit(
+ self, X: Any, y: Optional[List[Any]] = None
+ ) -> "DescriptorLike": # pragma: no cover - typing protocol
+ ...
+
+ def transform(self, X: Any, y: Optional[List[Any]] = None) -> DataFrame: ...
+
+ def get_feature_names(self) -> List[str]: ...
+
+
class ComplexFragmentor(DescriptorCalculator, BaseEstimator, TransformerMixin):
"""
- ComplexFragmentor class is a scikit-learn compatible transformer that concatenates the features
- according to specified associations. The most important argument is the "associator" - a list of tuples
- that establishes the correspondence between a column in a data frame X and the transformer
- that is trained on it (similarly to how sklearn Pipeline works).
-
- For example, say you have a data frame with molecules/CGRs in one column ("molecules"), and
- solvents in another ("solvent"). You want to generate a feture table that includes both structural
- and solvent descriptors. You would define a ComplexFragmentor class with associator as a list of tuples,
- where each tuple is a pair of column names and the corresponding feature generators. In this case, e.g.,
+ ComplexFragmentor class is a scikit-learn compatible transformer that concatenates
+ the features according to specified associations. The most important argument is
+ the "associator" - a list of tuples that establishes the correspondence between a
+ column in a data frame X and the transformer that is trained on it (similarly to
+ how sklearn Pipeline works).
+
+ For example, say you have a data frame with molecules/CGRs in one column
+ ("molecules"), and solvents in another ("solvent"). You want to generate a
+ feature table that includes both structural and solvent descriptors. You would
+ define a ComplexFragmentor class with associator as a list of tuples, where each
+ tuple is a pair of column names and the corresponding feature generators. In this
+ case, e.g.,
associator = [("molecules", Augmentor(lower=a, upper=b)),
- ("solvent":SolventVectorizer())] # see CIMTools library for solvent features
+ ("solvent":SolventVectorizer())] # see CIMTools for features
- ComplexFragmentor assumes that one of the types of features will be structural, thus,
- "structure_column" parameter defines the column of the data frame where structures are found.
+ ComplexFragmentor assumes that one of the types of features will be structural,
+ thus, "structure_column" parameter defines the column of the data frame where
+ structures are found.
"""
- def __init__(self, associator: List[Tuple[str, object]], structure_columns=None):
- self.structure_columns = [] if structure_columns is None else structure_columns
- self.associator = associator
+ def __init__(
+ self,
+ associator: Sequence[Tuple[str, DescriptorLike]],
+ structure_columns: Optional[List[str]] = None,
+ ) -> None:
+ self.structure_columns: List[str] = (
+ [] if structure_columns is None else structure_columns
+ )
+ self.associator: List[Tuple[str, DescriptorLike]] = list(associator)
# self.fragmentor = self.associator[self.structure_column]
- self.feature_names = []
- self._name = "ComplexFragmentor"
- self._short_name = ".".join([c[1].short_name for c in associator])
+ self.feature_names: List[str] = []
+ self._name: str = "ComplexFragmentor"
+ self._short_name: str = ".".join([c[1].short_name for c in associator])
- def fit(self, x: DataFrame, y: Optional[List] = None):
+ def fit(self, x: DataFrame, y: Optional[List[Any]] = None) -> "ComplexFragmentor":
"""
Fits the calculator - finds all possible substructures in the
given array of molecules/CGRs.
@@ -715,7 +787,11 @@ def fit(self, x: DataFrame, y: Optional[List] = None):
self.feature_names += [k + "::" + f for f in v.get_feature_names()]
return self
- def transform(self, x: DataFrame, y: Optional[List] = None) -> DataFrame:
+ def transform(
+ self,
+ x: Union[DataFrame, Dict[str, Any], List[Any], pd.Series],
+ y: Optional[List[Any]] = None,
+ ) -> DataFrame:
"""
Transforms the given data frame to a data frame of features
with their values. Applies each feature generator
@@ -730,7 +806,7 @@ def transform(self, x: DataFrame, y: Optional[List] = None) -> DataFrame:
doesn't change the function at all.
:type y: None
"""
- concat = []
+ concat: List[DataFrame] = []
if not isinstance(x, DataFrame) and isinstance(x, (dict, list, pd.Series)):
x = pd.DataFrame(x if isinstance(x, list) else [x])
for k, v in self.associator:
@@ -805,22 +881,22 @@ class PassThrough(DescriptorCalculator, BaseEstimator, TransformerMixin):
ComplexFragmentor.
"""
- def __init__(self, column_names: List[str]):
- self.column_names = column_names
- self.feature_names = self.column_names
- self._name = "numerical"
- self._short_name = "N"
- self._size = ()
+ def __init__(self, column_names: List[str]) -> None:
+ self.column_names: List[str] = column_names
+ self.feature_names: List[str] = self.column_names
+ self._name: str = "numerical"
+ self._short_name: str = "N"
+ self._size: Tuple[int, ...] = ()
- def fit(self, x: DataFrame, y=None):
+ def fit(self, x: DataFrame, y: Optional[List[Any]] = None) -> "PassThrough":
"""
Fits the calculator. Parameters are not necessary.
"""
return self
def transform(
- self, x: DataFrame, y: Optional[List] = None, check: Optional[bool] = True
- ):
+ self, x: DataFrame, y: Optional[List[Any]] = None, check: bool = True
+ ) -> DataFrame:
"""
Returns the column without any transformation.
@@ -839,7 +915,7 @@ def transform(
raise ValueError("Non numerical value(s) provided to PassThrough")
return df
- def get_feature_names(self):
+ def get_feature_names(self) -> List[str]:
return self.feature_names
@@ -877,7 +953,7 @@ def __init__(
upper: int = 0,
only_dynamic: bool = False,
fmt: str = "mol",
- ):
+ ) -> None:
"""
Circus descriptor calculator constructor.
@@ -890,19 +966,24 @@ def __init__(
:param only_dynamic: toggle for calculating only fragments with dynamic items.
:type only_dynamic: bool
- param fmt: format of the molecules for input ('mol' for MoleculeContainers, 'smiles' for strings).
+ param fmt: format of the molecules for input ('mol' for MoleculeContainers,
+ 'smiles' for strings).
:type fmt: str
"""
- self.feature_names = []
- self.features = []
- self.lower = lower
- self.upper = upper
- self.only_dynamic = only_dynamic
- self.fmt = fmt
- self._name = "linear"
- self._size = (lower, upper)
-
- def fit(self, X: DataFrame, y: Optional[List] = None):
+ self.feature_names: List[str] = []
+ self.features: List[Any] = []
+ self.lower: int = lower
+ self.upper: int = upper
+ self.only_dynamic: bool = only_dynamic
+ self.fmt: str = fmt
+ self._name: str = "linear"
+ self._size: Tuple[int, ...] = (lower, upper)
+
+ def fit(
+ self,
+ X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]],
+ y: Optional[List[Any]] = None,
+ ) -> "ChythonCircusNonhash":
"""
Fits the calculator - finds all possible substructures in the
given array of molecules/CGRs.
@@ -920,6 +1001,7 @@ def fit(self, X: DataFrame, y: Optional[List] = None):
for i, mol in enumerate(X):
if self.fmt == "smiles":
mol = smiles(mol)
+ mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol)
for length in range(self.lower, self.upper + 1):
for atom in mol.atoms():
# deep is the radius of the neighborhood sphere in bonds
@@ -932,7 +1014,11 @@ def fit(self, X: DataFrame, y: Optional[List] = None):
self.features.append(sub)
return self
- def transform(self, X: DataFrame, y: Optional[List] = None) -> DataFrame:
+ def transform(
+ self,
+ X: Iterable[Union[MoleculeContainer, CGRContainer, ReactionContainer, str]],
+ y: Optional[List[Any]] = None,
+ ) -> DataFrame:
"""
Transforms the given array of molecules/CGRs to a data frame
with features and their values.
@@ -949,15 +1035,18 @@ def transform(self, X: DataFrame, y: Optional[List] = None) -> DataFrame:
for i, mol in enumerate(X):
if self.fmt == "smiles":
mol = smiles(mol)
+ mol = cast(Union[MoleculeContainer, CGRContainer, ReactionContainer], mol)
table.loc[len(table)] = 0
for sub in self.features:
- # if CGRs are used, the transformation of the substructure to the CGRcontainer is needed
+ # If CGRs are used, transformation of the substructure to the
+ # CGR container is needed.
mapping = list(sub.get_mapping(mol))
- # mapping is the list of all possible substructure mappings into the given molecule/CGR
+ # Mapping is the list of all possible substructure mappings into
+ # the given molecule/CGR.
table.loc[i, str(sub)] = len(mapping)
return table
- def get_feature_names(self):
+ def get_feature_names(self) -> List[str]:
return self.feature_names
diff --git a/doptools/chem/coloratom.py b/doptools/chem/coloratom.py
index ef6ab79..a2e180d 100644
--- a/doptools/chem/coloratom.py
+++ b/doptools/chem/coloratom.py
@@ -1,4 +1,6 @@
# -*- coding: utf-8 -*-
+# mypy: ignore-errors
+# flake8: noqa
#
# Copyright 2022-2025 Pavel Sidorov This
# file is part of DOPTools repository.
diff --git a/doptools/chem/utils.py b/doptools/chem/utils.py
index 87cb172..6e648fa 100644
--- a/doptools/chem/utils.py
+++ b/doptools/chem/utils.py
@@ -80,7 +80,6 @@ def _pos_in_string_atom(cgr, cgr_string, number):
def _add_stereo_substructure(substructure, reaction):
- substructure_atoms = list(substructure._atoms)
cts = _gather_ct_stereos(reaction)
rss = _gather_rs_stereos(reaction)
cgr_smiles = str(substructure)
diff --git a/doptools/cli/__init__.py b/doptools/cli/__init__.py
index 1cabc71..08ebb73 100644
--- a/doptools/cli/__init__.py
+++ b/doptools/cli/__init__.py
@@ -16,7 +16,8 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, see .
-from .launch_optimizer import *
-from .launch_preparer import *
-from .plotter import *
+from .launch_optimizer import * # noqa: F401,F403
+from .launch_preparer import * # noqa: F401,F403
+from .plotter import * # noqa: F401,F403
+
# from .rebuilder import *
diff --git a/doptools/cli/ensemble_model_rebuilding.py b/doptools/cli/ensemble_model_rebuilding.py
index 7a4a971..7b982f2 100644
--- a/doptools/cli/ensemble_model_rebuilding.py
+++ b/doptools/cli/ensemble_model_rebuilding.py
@@ -1,3 +1,5 @@
+# flake8: noqa
+
import argparse
import glob
import logging
@@ -8,12 +10,12 @@
import sys
from functools import partial
from multiprocessing import Manager
+from typing import Any, Dict, Iterable, List, Optional, Tuple, cast
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from chython import smiles
-from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.datasets import load_svmlight_file
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.feature_selection import VarianceThreshold
@@ -25,7 +27,7 @@
from sklearn.metrics import r2_score as r2
from sklearn.metrics import root_mean_squared_error as rmse
from sklearn.pipeline import Pipeline
-from sklearn.preprocessing import FunctionTransformer, MinMaxScaler
+from sklearn.preprocessing import MinMaxScaler
from sklearn.svm import SVC, SVR
from xgboost import XGBClassifier, XGBRegressor
@@ -39,7 +41,9 @@
)
-def populate_trials_dictionary(trials_folders):
+def populate_trials_dictionary(
+ trials_folders: List[str],
+) -> Dict[str, Dict[str, str]]:
"""
Populate a dictionary with trial information from specified folders.
@@ -62,7 +66,7 @@ def populate_trials_dictionary(trials_folders):
trials_file = os.path.join(folder, "trials.best")
if os.path.isfile(trials_file):
- df = pd.read_csv(trials_file, sep="\s+")
+ df = pd.read_csv(trials_file, sep=r"\s+")
if "method" in df.columns:
method_value = df["method"].iloc[0]
if method_value in trials_dict:
@@ -84,7 +88,7 @@ def populate_trials_dictionary(trials_folders):
return trials_dict
-def create_output_dir(outdir):
+def create_output_dir(outdir: str) -> None:
"""
Create an output directory if it does not already exist.
@@ -105,7 +109,11 @@ def create_output_dir(outdir):
logging.info("The output directory {} created".format(outdir))
-def select_best_CV_models(trials_info_dict, model_type, nb_classes):
+def select_best_CV_models(
+ trials_info_dict: Dict[str, Dict[str, str]],
+ model_type: str,
+ nb_classes: Optional[int],
+) -> pd.DataFrame:
"""
Selects up to 15 best models based on the model's score in CV. Only one model is selected per descriptor space per ML method.
@@ -117,7 +125,7 @@ def select_best_CV_models(trials_info_dict, model_type, nb_classes):
Returns:
pandas.DataFrame: A DataFrame containing the selected best models sorted by score in descending order.
"""
- models_by_desc = {}
+ models_by_desc: Dict[str, List[Dict[str, Any]]] = {}
highest_score = float("-inf")
for method, info in trials_info_dict.items():
@@ -138,7 +146,7 @@ def select_best_CV_models(trials_info_dict, model_type, nb_classes):
with open(trials_file, "w") as file:
file.writelines(corrected_lines)
- model_stats = pd.read_csv(trials_file, sep="\s+")
+ model_stats = pd.read_csv(trials_file, sep=r"\s+")
highest_score = max(highest_score, model_stats["score"].max())
# Per each descriptor space only one (the best) descriptor space is selected.
for desc, group in model_stats.groupby("desc"):
@@ -146,7 +154,9 @@ def select_best_CV_models(trials_info_dict, model_type, nb_classes):
models_by_desc[desc] = []
models_by_desc[desc].extend(group.to_dict("records"))
- threshold = 1 / nb_classes if model_type == "class" else 0.5
+ if model_type == "class" and nb_classes is None:
+ raise ValueError("nb_classes must be provided for classification models.")
+ threshold = 1 / cast(int, nb_classes) if model_type == "class" else 0.5
if highest_score < threshold:
if not args.desperate:
logging.info(
@@ -171,7 +181,13 @@ def select_best_CV_models(trials_info_dict, model_type, nb_classes):
return best_models.head(10)
-def create_model_folder(desc_folder, outdir, models_from_CV, input_df, test_set_df):
+def create_model_folder(
+ desc_folder: str,
+ outdir: str,
+ models_from_CV: pd.DataFrame,
+ input_df: pd.DataFrame,
+ test_set_df: Optional[pd.DataFrame],
+) -> None:
"""
Create a folder containing relevant files (pickled pipelines and associated descriptor files) based on the best models and copy the training set file.
@@ -197,7 +213,7 @@ def create_model_folder(desc_folder, outdir, models_from_CV, input_df, test_set_
shutil.copyfile(file_path, os.path.join(outdir, file_name))
-def load_pkl(pkl_file):
+def load_pkl(pkl_file: str) -> Any:
"""
Load a pickled file from the given path.
@@ -216,14 +232,14 @@ def load_pkl(pkl_file):
def rebuild_and_evaluate_reg_model(
- model_row_tuple,
- shared_data,
- outdir,
- desc_folder,
- property_col,
- model_type,
- predict_df,
-):
+ model_row_tuple: Tuple[int, pd.Series],
+ shared_data: List[Dict[str, Any]],
+ outdir: str,
+ desc_folder: str,
+ property_col: str,
+ model_type: str,
+ predict_df: pd.DataFrame,
+) -> float:
"""
Rebuilds a regression model from specified parameters and evaluates it using the provided prediction dataset.
This function serves as a workaround to overcome the problem when a regression model gets decent score during CV.
@@ -304,8 +320,13 @@ def rebuild_and_evaluate_reg_model(
def rebuild_model(
- model_row_tuple, shared_data, outdir, desc_folder, property_col, model_type
-):
+ model_row_tuple: Tuple[int, pd.Series],
+ shared_data: List[Dict[str, Any]],
+ outdir: str,
+ desc_folder: str,
+ property_col: str,
+ model_type: str,
+) -> None:
"""
Rebuild a machine learning model based on the provided model information and input data.
@@ -414,7 +435,11 @@ def rebuild_model(
logging.info(f"{model_filename} saved.")
-def aggregate_CV_predictions(trials_info_dict, best_models, model_type):
+def aggregate_CV_predictions(
+ trials_info_dict: Dict[str, Dict[str, str]],
+ best_models: pd.DataFrame,
+ model_type: str,
+) -> pd.DataFrame:
"""
Aggregate predictions from various models and create a summary DataFrame.
@@ -440,7 +465,7 @@ def aggregate_CV_predictions(trials_info_dict, best_models, model_type):
if os.path.isfile(file_path):
# Read the predictions for the current model.
- trial_predictions = pd.read_csv(file_path, sep="\s+")
+ trial_predictions = pd.read_csv(file_path, sep=r"\s+")
# Extract the actual values and predicted values based on column headers
if actual_values is None:
actual_values = trial_predictions.filter(like=".observed").iloc[
@@ -490,7 +515,9 @@ def aggregate_CV_predictions(trials_info_dict, best_models, model_type):
return final_df
-def evaluate_AD_apply_model(desc_file, shared_molecules):
+def evaluate_AD_apply_model(
+ desc_file: str, shared_molecules: Iterable[Dict[str, Any]]
+) -> pd.DataFrame:
"""
Evaluate the applicability domain of the compounds and apply the model in the given descriptor space
@@ -509,7 +536,9 @@ def evaluate_AD_apply_model(desc_file, shared_molecules):
"""
- def frag_ctrl(p_DF, train_fragments, desc_space):
+ def frag_ctrl(
+ p_DF: pd.DataFrame, train_fragments: set[Any], desc_space: str
+ ) -> pd.DataFrame:
"""
Update the confidence level column based on the fragment control check.
@@ -522,7 +551,7 @@ def frag_ctrl(p_DF, train_fragments, desc_space):
pd.DataFrame: The updated DataFrame with confidence levels.
"""
- def conf_update(row):
+ def conf_update(row: pd.Series) -> pd.Series:
"""
Update the confidence level for a single row based on the fragment control check.
@@ -558,7 +587,12 @@ def conf_update(row):
return p_DF.apply(conf_update, axis=1)
- def bbox(p_DF, max_train_descs, p_descs, desc_space):
+ def bbox(
+ p_DF: pd.DataFrame,
+ max_train_descs: np.ndarray,
+ p_descs: np.ndarray,
+ desc_space: str,
+ ) -> pd.DataFrame:
"""
Update the confidence level column based on the bounding box check.
@@ -615,7 +649,7 @@ def bbox(p_DF, max_train_descs, p_descs, desc_space):
0
] # Extract the file name without extension
print(model_name)
- model_pipeline = load_pkl(model_path)
+ model_pipeline: Any = load_pkl(model_path)
# Initialize column 'Conf' + desc_space populated with zeros
shared_predict_df[f"Conf-{desc_space}"] = 0
@@ -655,7 +689,11 @@ def bbox(p_DF, max_train_descs, p_descs, desc_space):
return shared_predict_df
-def aggregate_test_predictions(all_predictions, ext_test_set_DF, model_type):
+def aggregate_test_predictions(
+ all_predictions: Dict[str, Dict[str, Any]],
+ ext_test_set_DF: pd.DataFrame,
+ model_type: str,
+) -> Dict[str, pd.DataFrame]:
"""
Aggregates prediction data for a given external test set DataFrame and calculates
confidence levels and statistical summaries based on the model type.
@@ -684,7 +722,7 @@ def aggregate_test_predictions(all_predictions, ext_test_set_DF, model_type):
to the provided model type.
"""
- def in_AD_aggregation(df_row):
+ def in_AD_aggregation(df_row: pd.Series) -> pd.Series:
"""
Aggregates predictions for molecules within the applicability domain.
@@ -789,7 +827,9 @@ def in_AD_aggregation(df_row):
return {"In_AD": DF_in_AD, "Out_AD": DF_out_AD}
-def calculate_scores(final_df, property_col, model_type):
+def calculate_scores(
+ final_df: pd.DataFrame, property_col: str, model_type: str
+) -> Dict[str, float]:
"""
Calculate evaluation scores based on the true and predicted values.
@@ -817,7 +857,13 @@ def calculate_scores(final_df, property_col, model_type):
return scores
-def plot_regression(dataframe, property_col, scores, outdir, test_set_df):
+def plot_regression(
+ dataframe: pd.DataFrame,
+ property_col: str,
+ scores: Dict[str, float],
+ outdir: str,
+ test_set_df: Optional[pd.DataFrame],
+) -> None:
"""
Create a regression plot based on the true and predicted values and save it to the specified output directory.
@@ -906,8 +952,13 @@ def plot_regression(dataframe, property_col, scores, outdir, test_set_df):
def generate_confusion_matrix(
- dataframe, scores, outdir, nb_classes, class_info, test_set_df
-):
+ dataframe: pd.DataFrame,
+ scores: Dict[str, float],
+ outdir: str,
+ nb_classes: int,
+ class_info: str,
+ test_set_df: Optional[pd.DataFrame],
+) -> None:
"""
Generate a confusion matrix and write it along with scores to a file in the specified output directory.
@@ -1078,10 +1129,11 @@ def generate_confusion_matrix(
property_col = args.property_col
# Maybe not the most elegant solution, but it does what it needs to do. Maybe will refactor one day
- final_DF = None
- final_DF_out_AD = None
+ final_DF: Optional[pd.DataFrame] = None
+ final_DF_out_AD: Optional[pd.DataFrame] = None
# Validate model type and number of classes
+ nb_classes: Optional[int]
if model_type == "class":
if args.class_info is None:
logging.error(
@@ -1191,9 +1243,11 @@ def generate_confusion_matrix(
partial_rebuild_and_evaluatefunc = partial(
rebuild_and_evaluate_reg_model, **kwargs
)
- results = pool.map(partial_rebuild_and_evaluatefunc, first_func_args)
+ eval_scores = pool.map(
+ partial_rebuild_and_evaluatefunc, first_func_args
+ )
# Assign the results back to the model_from_CV. It is safe to do that, because when using pool.map() the order of the results is preserved relative to the order of the inputs.
- models_from_CV["evaluation_score"] = results
+ models_from_CV["evaluation_score"] = eval_scores
indices_to_drop = models_from_CV[
models_from_CV["evaluation_score"] < 0.5
].index # Do you really want to live in a world were models with such score are getting accepted?
@@ -1276,7 +1330,7 @@ def generate_confusion_matrix(
if model_type == "reg":
minimal_row_requirement = 2
else:
- minimal_row_requirement = nb_classes
+ minimal_row_requirement = cast(int, nb_classes)
# Handling compounds in AD
if len(final_DF) >= minimal_row_requirement:
@@ -1288,7 +1342,12 @@ def generate_confusion_matrix(
plot_regression(final_DF, property_col, scores, model_folder, test_set_df)
else:
generate_confusion_matrix(
- final_DF, scores, model_folder, nb_classes, class_info, test_set_df
+ final_DF,
+ scores,
+ model_folder,
+ cast(int, nb_classes),
+ class_info,
+ test_set_df,
)
else:
logging.info(
diff --git a/doptools/cli/launch_optimizer.py b/doptools/cli/launch_optimizer.py
index 36119f5..393c181 100644
--- a/doptools/cli/launch_optimizer.py
+++ b/doptools/cli/launch_optimizer.py
@@ -21,12 +21,10 @@
import contextlib
import os
import warnings
-from functools import partial
-from multiprocessing import Manager
import optuna
-from doptools.optimizer.optimizer import *
+from doptools.optimizer.optimizer import collect_data, launch_study
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=DeprecationWarning)
@@ -34,30 +32,42 @@
optuna.logging.set_verbosity(optuna.logging.WARNING)
-def launch_optimizer():
+def launch_optimizer() -> None:
parser = argparse.ArgumentParser(
prog="Optuna optimizer",
- description='Optimizes the hyperparameters of ML method on given data, as well as selects the "best" descriptor space.',
+ description=(
+ "Optimizes the hyperparameters of ML method on given data, as well as "
+ 'selects the "best" descriptor space.'
+ ),
)
parser.add_argument(
"-d",
"--datadir",
required=True,
- help="Path to the directory containing the descriptors files to run the optimisation on.",
+ help=(
+ "Path to the directory containing the descriptors files to run the "
+ "optimisation on."
+ ),
)
parser.add_argument(
"-o",
"--outdir",
required=True,
- help="Path to the output directory where the results optimization will be saved.",
+ help=(
+ "Path to the output directory where the results optimization will be "
+ "saved."
+ ),
)
parser.add_argument(
"--ntrials",
type=int,
default=100,
- help="Number of hyperparameter sets to explore. After exploring this number of sets, the optimization stops. Default = 100.",
+ help=(
+ "Number of hyperparameter sets to explore. After exploring this number "
+ "of sets, the optimization stops. Default = 100."
+ ),
)
parser.add_argument(
"--cv_splits",
@@ -69,26 +79,37 @@ def launch_optimizer():
"--cv_repeats",
type=int,
default=1,
- help="Number of times the cross-validation will be repeated with shuffling. Scores are reported as consensus between repeats. Default = 1.",
+ help=(
+ "Number of times the cross-validation will be repeated with shuffling. "
+ "Scores are reported as consensus between repeats. Default = 1."
+ ),
)
parser.add_argument(
"--earlystop_patience",
type=int,
default=0,
- help="Number of optimization steps that the best N solutions must not change for the early stopping. By default early stopping is not triggered.",
+ help=(
+ "Number of optimization steps that the best N solutions must not change "
+ "for the early stopping. By default early stopping is not triggered."
+ ),
)
parser.add_argument(
"--earlystop_leaders",
type=int,
default=1,
- help="Number N of best solutions that will be checked for the early stopping. Default = 1.",
+ help=(
+ "Number N of best solutions that will be checked for the early stopping. "
+ "Default = 1."
+ ),
)
parser.add_argument(
"--timeout",
type=int,
default=60,
- help="Timeout in sec. If a trial takes longer it will be killed. Default = 60.",
+ help=(
+ "Timeout in sec. If a trial takes longer it will be killed. Default = 60."
+ ),
)
parser.add_argument(
@@ -96,7 +117,10 @@ def launch_optimizer():
"--jobs",
type=int,
default=1,
- help="Number of processes that will be launched in parallel during the optimization. Default = 1.",
+ help=(
+ "Number of processes that will be launched in parallel during the "
+ "optimization. Default = 1."
+ ),
)
parser.add_argument(
"-m",
@@ -104,7 +128,9 @@ def launch_optimizer():
type=str,
default="SVR",
choices=["SVR", "SVC", "RFR", "RFC", "XGBR", "XGBC"],
- help="ML algorithm to be used for optimization. Only one can be used at a time.",
+ help=(
+ "ML algorithm to be used for optimization. Only one can be used at a time."
+ ),
)
# parser.add_argument('--multi', action='store_true')
parser.add_argument(
@@ -131,9 +157,8 @@ def launch_optimizer():
if os.path.exists(outdir):
print(
- "The output directory {} already exists. The data may be overwritten".format(
- outdir
- )
+ "The output directory {} already exists. The data may be "
+ "overwritten".format(outdir)
)
else:
os.makedirs(outdir)
diff --git a/doptools/cli/launch_preparer.py b/doptools/cli/launch_preparer.py
index ecf6557..0f0f0fa 100644
--- a/doptools/cli/launch_preparer.py
+++ b/doptools/cli/launch_preparer.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
+# flake8: noqa
#
# Copyright 2022-2025 Pavel Sidorov This
# file is part of DOPTools repository.
@@ -23,8 +24,8 @@
import multiprocessing as mp
import os
import pickle
-import warnings
-from itertools import combinations, product
+from itertools import product
+from typing import Any, Dict, Iterable, List, Tuple
import numpy as np
import pandas as pd
@@ -34,7 +35,12 @@
from doptools.chem.chem_features import ComplexFragmentor, PassThrough
from doptools.chem.solvents import SolventVectorizer
from doptools.optimizer.config import get_raw_calculator
-from doptools.optimizer.preparer import *
+from doptools.optimizer.preparer import (
+ calculate_and_output,
+ check_parameters,
+ create_input,
+ create_output_dir,
+)
logging.basicConfig(
format="{asctime} - {levelname} - {message}",
@@ -42,7 +48,7 @@
datefmt="%Y-%m-%d %H:%M",
)
-basic_params = {
+basic_params: Dict[str, Any] = {
"circus": True,
"circus_min": [0],
"circus_max": [2, 3, 4],
@@ -76,7 +82,9 @@
}
-def _calculate_and_output(input_params):
+def _calculate_and_output(
+ input_params: Tuple[Any, pd.DataFrame, np.ndarray, str, str, bool, str]
+) -> None:
calculator, data, prop, prop_name, output_folder, pickles, fmt = input_params
desc = calculator.fit_transform(data)
@@ -112,8 +120,8 @@ def _calculate_and_output(input_params):
)
-def _perform_fullconfig(fullconfig):
- calculators = {}
+def _perform_fullconfig(fullconfig: Dict[str, Any]) -> None:
+ calculators: Dict[str, Any] = {}
if fullconfig["input_file"].endswith(".csv"):
data = pd.read_table(fullconfig["input_file"], sep=",")
@@ -129,17 +137,18 @@ def _perform_fullconfig(fullconfig):
for m in struct:
try:
m.canonicalize(fix_tautomers=False)
- except:
+ except Exception:
m.canonicalize(fix_tautomers=False)
data[s] = [str(m) for m in struct]
- y = data[fullconfig["property"]]
+ property_col = fullconfig["property"]
+ y = data[property_col]
indices = y[pd.notnull(y)].index
if len(indices) < len(data):
print(
- f"'{p}' column warning: only {len(indices)} out of {len(data)} instances have the property."
+ f"'{property_col}' column warning: only {len(indices)} out of {len(data)} instances have the property."
)
- print(f"Molecules that don't have the property will be discarded from the set.")
+ print("Molecules that don't have the property will be discarded from the set.")
y = y.iloc[indices]
data = data.iloc[indices]
@@ -152,7 +161,7 @@ def _perform_fullconfig(fullconfig):
fullconfig["separate_folders"] = False
- associators = []
+ associators: List[List[Tuple[str, Any]]] = []
for s in fullconfig["structures"].keys():
associators.append([])
for t, d in fullconfig["structures"][s].items():
@@ -170,9 +179,10 @@ def _perform_fullconfig(fullconfig):
if "numerical" in fullconfig.keys():
associators.append([("numerical", PassThrough(fullconfig["numerical"]))])
- for p in product(*associators):
+ for assoc in product(*associators):
cf = ComplexFragmentor(
- associator=p, structure_columns=list(fullconfig["structures"].keys())
+ associator=assoc,
+ structure_columns=list(fullconfig["structures"].keys()),
)
calculators[cf.short_name] = cf
else:
@@ -210,18 +220,18 @@ def _perform_fullconfig(fullconfig):
pool.join() # Wait for all the tasks to complete
-def _set_default(argument, default_values):
+def _set_default(argument: List[Any], default_values: List[Any]) -> List[Any]:
if len(argument) > 0:
return list(set(argument))
else:
return default_values
-def _enumerate_parameters(args):
- def _make_name(iterable):
+def _enumerate_parameters(args: Any) -> Dict[str, Dict[str, Any]]:
+ def _make_name(iterable: Iterable[Any]) -> str:
return "_".join([str(i) for i in iterable])
- param_dict = {}
+ param_dict: Dict[str, Dict[str, Any]] = {}
if args.morgan:
for nb in _set_default(args.morgan_nBits, [1024]):
for mr in _set_default(args.morgan_radius, [2]):
@@ -288,13 +298,15 @@ def _make_name(iterable):
return param_dict
-def _pickle_descriptors(output_dir, fragmentor, prop_name, desc_name):
+def _pickle_descriptors(
+ output_dir: str, fragmentor: Any, prop_name: str, desc_name: str
+) -> None:
fragmentor_name = os.path.join(output_dir, ".".join([prop_name, desc_name, "pkl"]))
with open(fragmentor_name, "wb") as f:
pickle.dump(fragmentor, f, pickle.HIGHEST_PROTOCOL)
-def launch_preparer():
+def launch_preparer() -> None:
parser = argparse.ArgumentParser(
prog="Descriptor calculator",
description="Prepares the descriptor files for hyperparameter optimization launch.",
diff --git a/doptools/cli/plotter.py b/doptools/cli/plotter.py
index c34dfa1..a05968b 100644
--- a/doptools/cli/plotter.py
+++ b/doptools/cli/plotter.py
@@ -19,10 +19,13 @@
import argparse
import warnings
+from typing import Any, Dict, Tuple
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
+from matplotlib.axes import Axes
+from matplotlib.figure import Figure
from sklearn.metrics import auc
from sklearn.metrics import mean_absolute_error as mae
from sklearn.metrics import roc_curve
@@ -33,7 +36,9 @@
warnings.simplefilter(action="ignore", category=DeprecationWarning)
-def make_regression_plot(predictions, errorbar=False, stats=False, title=""):
+def make_regression_plot(
+ predictions: str, errorbar: bool = False, stats: bool = False, title: str = ""
+) -> Tuple[Figure, Axes]:
fig, ax = plt.subplots(figsize=(4, 4), dpi=300, facecolor="white")
@@ -72,7 +77,9 @@ def make_regression_plot(predictions, errorbar=False, stats=False, title=""):
return fig, ax
-def prepare_classification_plot(cv_res, pos_class=1):
+def prepare_classification_plot(
+ cv_res: pd.DataFrame, pos_class: int = 1
+) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]:
prop_name = cv_res.columns[1].split(".")[0]
true_val = cv_res[prop_name + ".observed"].values
pos_label = [
@@ -121,7 +128,9 @@ def prepare_classification_plot(cv_res, pos_class=1):
return roc_repeats, roc_mean
-def make_classification_plot(predictions, class_number, **params):
+def make_classification_plot(
+ predictions: str, class_number: int, **params: Any
+) -> Tuple[Figure, Axes]:
cv_res = pd.read_table(predictions, sep=" ")
roc_repeats, roc_mean = prepare_classification_plot(cv_res, class_number)
fig, ax = plt.subplots(figsize=(5, 5), dpi=300, facecolor="w")
@@ -166,7 +175,7 @@ def make_classification_plot(predictions, class_number, **params):
return fig, ax
-def plotter():
+def plotter() -> None:
parser = argparse.ArgumentParser(
prog="Model CV plotter", description="Plot out the CV results of the optimizer"
)
diff --git a/doptools/cli/rebuilder.py b/doptools/cli/rebuilder.py
index 35353f6..172bd4d 100644
--- a/doptools/cli/rebuilder.py
+++ b/doptools/cli/rebuilder.py
@@ -18,58 +18,49 @@
# along with this program; if not, see .
import argparse
-import glob
import os
import pickle
import warnings
-from typing import Dict, Iterable, List, Optional, Tuple
+from datetime import datetime
+from typing import Any, Iterable, List, Optional, Tuple
import pandas as pd
from sklearn.feature_selection import VarianceThreshold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
+from doptools.chem.chem_features import ComplexFragmentor
+from doptools.estimators.consensus import ConsensusModel
from doptools.optimizer.config import get_raw_model
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=DeprecationWarning)
-import argparse
-import glob
-import os
-import pickle
-from typing import Dict, Iterable, List, Optional, Tuple
-
-import pandas as pd
-
-from doptools.optimizer.config import get_raw_model
-
-
class Rebuilder:
def __init__(
self,
- file: str = None,
- folders: List[str] = None,
- desc_folder: str = None,
+ file: Optional[str] = None,
+ folders: Optional[List[str]] = None,
+ desc_folder: Optional[str] = None,
ensemble: int = 1,
- score_threshold=0.5,
- ):
- self.file = file
- self.folders = folders
- self.desc_folder = desc_folder
+ score_threshold: float = 0.5,
+ ) -> None:
+ self.file: Optional[str] = file
+ self.folders: Optional[List[str]] = folders
+ self.desc_folder: Optional[str] = desc_folder
if self.file is None and self.folders is None:
raise ValueError(
"At least one file or folder should be given to rebuild models"
)
- self.ensemble = ensemble
- self.score_threshold = score_threshold
- self.prop = ""
- self.model = None
- self.trained = False
-
- def gather_trials(self, trials="all"):
- trial_files = []
+ self.ensemble: int = ensemble
+ self.score_threshold: float = score_threshold
+ self.prop: str = ""
+ self.model: Optional[Any] = None
+ self.trained: bool = False
+
+ def gather_trials(self, trials: str = "all") -> pd.DataFrame:
+ trial_files: List[str] = []
if self.folders is not None:
for f in self.folders:
trial_files.append(os.path.join(f, "trials." + trials))
@@ -88,11 +79,13 @@ def gather_trials(self, trials="all"):
)
return full_df
- def rebuild(self, one_per_descriptor=False):
+ def rebuild(self, one_per_descriptor: bool = False) -> None:
+ if self.desc_folder is None:
+ raise ValueError("desc_folder must be provided to rebuild models.")
trials = self.gather_trials()
trials = trials.sort_values(by="score", ascending=False)
- models = []
- selected_descs = []
+ models: List[Any] = []
+ selected_descs: List[str] = []
for i, row in trials.iterrows():
if len(models) >= self.ensemble:
@@ -142,7 +135,12 @@ def rebuild(self, one_per_descriptor=False):
else:
self.model = ConsensusModel(models)
- def train(self, train_set, train_prop, smiles_column=None):
+ def train(
+ self,
+ train_set: Any,
+ train_prop: Any,
+ smiles_column: Optional[str] = None,
+ ) -> None:
if self.model is None:
raise AttributeError(
"The model has not been created yet. Use rebuild function first."
@@ -153,7 +151,12 @@ def train(self, train_set, train_prop, smiles_column=None):
train_data = pd.read_excel(train_set)
elif train_set.endswith("csv"):
train_data = pd.read_table(train_set)
- if smiles_column is not None or isinstance(models[0][0], ComplexFragmentor):
+ descriptor = (
+ self.model.pipelines[0][0]
+ if isinstance(self.model, ConsensusModel)
+ else self.model[0]
+ )
+ if smiles_column is not None or isinstance(descriptor, ComplexFragmentor):
x_train = train_data[smiles_column]
else:
x_train = train_data
@@ -163,11 +166,12 @@ def train(self, train_set, train_prop, smiles_column=None):
self.model.fit(x_train, train_prop)
self.trained = True
- def save_model(self, save_dest):
+ def save_model(self, save_dest: str, trained: Optional[bool] = None) -> None:
+ if trained is not None:
+ self.trained = trained
if not os.path.exists(save_dest):
- os.makedirs(
- save_dest, exist_ok=True
- ) # exist_ok is useful when several processes try to create the folder at the same time
+ os.makedirs(save_dest, exist_ok=True)
+ # exist_ok helps when several processes try to create the folder at once
print("The output directory {} created".format(save_dest))
if self.model is None:
raise AttributeError(
@@ -195,13 +199,22 @@ def save_model(self, save_dest):
with open(os.path.join(save_dest, filename), "wb") as f:
pickle.dump(self.model, f, pickle.HIGHEST_PROTOCOL)
- def apply(self, test_set, smiles_column=None):
+ def apply(self, test_set: Any, smiles_column: Optional[str] = None) -> Any:
+ if self.model is None:
+ raise AttributeError(
+ "The model has not been created yet. Use rebuild function first."
+ )
if isinstance(test_set, str):
if test_set.endswith("xlsx") or test_set.endswith("xls"):
test_data = pd.read_excel(test_set)
elif test_set.endswith("csv"):
test_data = pd.read_table(test_set)
- if smiles_column is not None or isinstance(models[0][0], ComplexFragmentor):
+ descriptor = (
+ self.model.pipelines[0][0]
+ if isinstance(self.model, ConsensusModel)
+ else self.model[0]
+ )
+ if smiles_column is not None or isinstance(descriptor, ComplexFragmentor):
x_test = test_data[smiles_column]
else:
x_test = test_data
@@ -210,50 +223,64 @@ def apply(self, test_set, smiles_column=None):
results = self.model.predict(x_test)
return results
- def rebuild_save(self, save_dest, one_per_descriptor=False):
+ def rebuild_save(self, save_dest: str, one_per_descriptor: bool = False) -> None:
self.rebuild(one_per_descriptor)
self.save_model(save_dest)
def rebuild_train_save(
self,
- save_dest,
- train_set,
- train_prop,
- smiles_column=None,
- one_per_descriptor=False,
- ):
+ save_dest: str,
+ train_set: Any,
+ train_prop: Any,
+ smiles_column: Optional[str] = None,
+ one_per_descriptor: bool = False,
+ ) -> None:
self.rebuild(one_per_descriptor)
self.train(train_set, train_prop, smiles_column)
self.save_model(save_dest, trained=True)
def rebuild_train_apply(
self,
- train_set,
- train_prop,
- test_set,
- smiles_column=None,
- one_per_descriptor=False,
- ):
+ train_set: Any,
+ train_prop: Any,
+ test_set: Any,
+ smiles_column: Optional[str] = None,
+ one_per_descriptor: bool = False,
+ ) -> Any:
self.rebuild(one_per_descriptor)
self.train(train_set, train_prop, smiles_column)
results = self.apply(test_set, smiles_column)
return results
- def save_self(self, save_dest):
+ def save_self(self, save_dest: str) -> None:
with open(save_dest, "wb") as f:
pickle.dump(self, f, pickle.HIGHEST_PROTOCOL)
-def rebuilder():
+def rebuild_from_file(
+ descdir: str, modeldir: str, number: int
+) -> Tuple[Any, dict[str, Any]]:
+ raise NotImplementedError(
+ "rebuild_from_file is not implemented. Use Rebuilder.rebuild for now."
+ )
+
+
+def rebuilder() -> None:
parser = argparse.ArgumentParser(
prog="Optimized model rebuilder",
- description="Rebuilds the model from the optimized trial parameters,\nsaving it as an UNTRAINED pipeline in pickle",
+ description=(
+ "Rebuilds the model from the optimized trial parameters, saving it as "
+ "an UNTRAINED pipeline in pickle"
+ ),
)
parser.add_argument(
"-d",
"--descdir",
required=True,
- help="the folder containing descriptor files and calculators. Can contain folders separated by descriptor type",
+ help=(
+ "the folder containing descriptor files and calculators. Can contain "
+ "folders separated by descriptor type"
+ ),
)
parser.add_argument(
"-f", "--fileinput", help='the "trials.all" or "trails.best" file.'
@@ -261,7 +288,10 @@ def rebuilder():
parser.add_argument(
"-m",
"--modeldir",
- help='the folder containing model output files. Should contain "trials.all" file.',
+ help=(
+ 'the folder containing model output files. Should contain "trials.all" '
+ "file."
+ ),
)
parser.add_argument(
"-o", "--outdir", required=True, help="the output folder for the models."
@@ -271,13 +301,19 @@ def rebuilder():
"--ensemble",
type=int,
deafult=1,
- help="the number of models that would be taken for an ensemble. Default 1 (non-ensemble).",
+ help=(
+ "the number of models that would be taken for an ensemble. Default 1 "
+ "(non-ensemble)."
+ ),
)
parser.add_argument(
"-e",
"--ensemble",
action="store_true",
- help="toggle to indicate that only one model per descriptor type is taken into ensemble",
+ help=(
+ "toggle to indicate that only one model per descriptor type is taken "
+ "into ensemble"
+ ),
)
args = parser.parse_args()
@@ -288,9 +324,8 @@ def rebuilder():
if os.path.exists(outdir):
print(
- "The output directory {} already exists. The data may be overwritten".format(
- outdir
- )
+ "The output directory {} already exists. The data may be "
+ "overwritten".format(outdir)
)
else:
os.makedirs(outdir)
diff --git a/doptools/estimators/ad_estimators.py b/doptools/estimators/ad_estimators.py
index 601841f..a64e3e1 100644
--- a/doptools/estimators/ad_estimators.py
+++ b/doptools/estimators/ad_estimators.py
@@ -1,32 +1,35 @@
from copy import deepcopy
+from typing import Any, Iterable, List, Optional, Union
+import pandas as pd
from pandas import DataFrame
-from sklearn.base import BaseEstimator, OutlierMixin, clone
+from sklearn.base import BaseEstimator, OutlierMixin
+from sklearn.datasets import load_svmlight_file
from sklearn.utils.validation import check_is_fitted
-import pandas as pd
-
class FragmentControl(BaseEstimator, OutlierMixin):
- def __init__(self, pipeline):
- self.pipeline = pipeline
- self.fragmentor = deepcopy(pipeline[0])
- self.feature_names = []
+ def __init__(self, pipeline: Any) -> None:
+ self.pipeline: Any = pipeline
+ self.fragmentor: Any = deepcopy(pipeline[0])
+ self.feature_names: List[str] = []
try:
check_is_fitted(self.pipeline)
self.feature_names = pipeline[0].get_feature_names()
- except:
+ except Exception:
print("The pipeline is not fitted, you should fit it.")
- def fit(self, X, y=None):
+ def fit(self, X: Any, y: Optional[Iterable[Any]] = None) -> "FragmentControl":
self.pipeline.fit(X, y)
self.fragmentor = deepcopy(self.pipeline[0])
self.feature_names = self.pipeline[0].get_feature_names()
self.is_fitted_ = True
return self
- def predict(self, X, y=None):
- res = []
+ def predict(
+ self, X: Union[DataFrame, List[Any]], y: Optional[Iterable[Any]] = None
+ ) -> List[int]:
+ res: List[int] = []
for i in range(len(X)):
if isinstance(X, DataFrame):
x = X.iloc[i]
@@ -42,11 +45,16 @@ def predict(self, X, y=None):
class BoundingBox(BaseEstimator, OutlierMixin):
- def __init__(self, pipeline):
- self.pipeline = pipeline
- self.fragmentor = deepcopy(pipeline[0])
+ def __init__(self, pipeline: Any) -> None:
+ self.pipeline: Any = pipeline
+ self.fragmentor: Any = deepcopy(pipeline[0])
- def fit(self, X, y=None, svm_file=None):
+ def fit(
+ self,
+ X: Any,
+ y: Optional[Iterable[Any]] = None,
+ svm_file: Optional[str] = None,
+ ) -> "BoundingBox":
self.is_fitted_ = True
if svm_file is not None:
d, _ = load_svmlight_file(svm_file)
@@ -57,8 +65,10 @@ def fit(self, X, y=None, svm_file=None):
self.max_limits = descs.max(axis=0)
return self
- def predict(self, X, y=None):
- res = []
+ def predict(
+ self, X: Union[DataFrame, List[Any]], y: Optional[Iterable[Any]] = None
+ ) -> List[int]:
+ res: List[int] = []
for i in range(len(X)):
if isinstance(X, DataFrame):
x = X.iloc[i]
@@ -77,23 +87,29 @@ def predict(self, X, y=None):
class PipelineWithAD(BaseEstimator):
- def __init__(self, pipeline, ad_type, threshold=None):
- self.ad_type = ad_type
- self.pipeline = pipeline
- self.threshold = threshold
+ def __init__(
+ self, pipeline: Any, ad_type: str, threshold: Optional[float] = None
+ ) -> None:
+ self.ad_type: str = ad_type
+ self.pipeline: Any = pipeline
+ self.threshold: Optional[float] = threshold
if self.ad_type == "FragmentControl":
self.ad_estimator = FragmentControl(self.pipeline)
elif self.ad_type == "BoundingBox":
- self.ad_estimator = BoudingBox(self.pipeline)
+ self.ad_estimator = BoudingBox( # type: ignore[name-defined] # noqa: F821
+ self.pipeline
+ )
- def fit(self, X, y=None):
+ def fit(self, X: Any, y: Optional[Iterable[Any]] = None) -> "PipelineWithAD":
self.is_fitted_ = True
self.pipeline.fit(X, y)
self.ad_estimator.fit(X, y)
return self
- def predict(self, X, y=None):
- res = []
+ def predict(
+ self, X: Union[DataFrame, List[Any]], y: Optional[Iterable[Any]] = None
+ ) -> DataFrame:
+ res: List[tuple[Any, Any]] = []
for i in range(len(X)):
if isinstance(X, DataFrame):
x = X.iloc[i]
@@ -102,9 +118,11 @@ def predict(self, X, y=None):
res.append((self.pipeline.predict(x)[0], self.ad_estimator.predict(x)[0]))
return pd.DataFrame(res, columns=["Predicted", "AD"])
- def predict_within_AD(self, X, y=None):
- res = []
- indices = []
+ def predict_within_AD(
+ self, X: Union[DataFrame, List[Any]], y: Optional[Iterable[Any]] = None
+ ) -> DataFrame:
+ res: List[Any] = []
+ indices: List[int] = []
for i in range(len(X)):
if isinstance(X, DataFrame):
x = X.iloc[i]
diff --git a/doptools/estimators/consensus.py b/doptools/estimators/consensus.py
index 62fa410..e5ce78f 100644
--- a/doptools/estimators/consensus.py
+++ b/doptools/estimators/consensus.py
@@ -1,7 +1,7 @@
-from typing import Tuple
+from typing import Any, Iterable, List, Optional
-import pandas as pd
import numpy as np
+import pandas as pd
from sklearn import base
from sklearn.base import BaseEstimator
@@ -9,12 +9,12 @@
class ConsensusModel(BaseEstimator):
- def __init__(self, pipelines):
- self.model_type = "R"
- self.ad_type = None
- if isinstance(pipelines[0], Tuple):
- self.names = [p[0] for p in pipelines]
- self.pipelines = [p[1] for p in pipelines]
+ def __init__(self, pipelines: List[Any]) -> None:
+ self.model_type: str = "R"
+ self.ad_type: Optional[str] = None
+ if isinstance(pipelines[0], tuple):
+ self.names: List[str] = [p[0] for p in pipelines]
+ self.pipelines: List[Any] = [p[1] for p in pipelines]
else:
self.names = ["model" + str(i + 1) for i in range(len(pipelines))]
self.pipelines = pipelines
@@ -29,14 +29,19 @@ def __init__(self, pipelines):
if issubclass(self.pipelines[0][-1].__class__, base.ClassifierMixin):
self.model_type = "C"
- def fit(self, X, y=None):
+ def fit(self, X: Any, y: Optional[Iterable[Any]] = None) -> "ConsensusModel":
for p in self.pipelines:
p.fit(X, y)
self.is_fitted_ = True
return self
- def predict(self, X, y=None, output="all"):
- preds = []
+ def predict(
+ self,
+ X: Any,
+ y: Optional[Iterable[Any]] = None,
+ output: str = "all",
+ ) -> pd.DataFrame:
+ preds: List[Any] = []
if self.ad_type is None:
preds = np.array([p.predict(X) for p in self.pipelines]).T
@@ -68,7 +73,9 @@ def predict(self, X, y=None, output="all"):
elif output == "preds":
return res[self.names]
- def predict_within_AD(self, X, y=None, output="all"):
+ def predict_within_AD(
+ self, X: Any, y: Optional[Iterable[Any]] = None, output: str = "all"
+ ) -> pd.DataFrame:
if self.ad_type is None:
return self.predict(X, y, output)
else:
diff --git a/doptools/optimizer/__init__.py b/doptools/optimizer/__init__.py
index 9fecfd1..f71a746 100644
--- a/doptools/optimizer/__init__.py
+++ b/doptools/optimizer/__init__.py
@@ -16,6 +16,6 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, see .
-from .config import *
-from .optimizer import *
-from .preparer import *
+from .config import * # noqa: F401,F403
+from .optimizer import * # noqa: F401,F403
+from .preparer import * # noqa: F401,F403
diff --git a/doptools/optimizer/config.py b/doptools/optimizer/config.py
index 1ee70f5..4d4d509 100644
--- a/doptools/optimizer/config.py
+++ b/doptools/optimizer/config.py
@@ -16,11 +16,13 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, see .
-from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
-from sklearn.svm import SVC, SVR
-from xgboost import XGBClassifier, XGBRegressor
+from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor # noqa: F401
+from sklearn.svm import SVC, SVR # noqa: F401
+from xgboost import XGBClassifier, XGBRegressor # noqa: F401
-from doptools.chem.chem_features import ChythonCircus, ChythonLinear, Fingerprinter
+from doptools.chem.chem_features import ChythonCircus # noqa: F401
+from doptools.chem.chem_features import ChythonLinear # noqa: F401
+from doptools.chem.chem_features import Fingerprinter # noqa: F401
methods = {
"SVR": "SVR(**params, gamma='auto')",
@@ -35,14 +37,14 @@
"circus": "ChythonCircus(**descriptor_params)",
"chyline": "ChythonLinear(**descriptor_params)",
"morgan": "Fingerprinter(fp_type='morgan', **descriptor_params)",
- "morganfeatures": "Fingerprinter(fp_type='morgan', params={'useFeatures':True}, **descriptor_params)",
+ "morganfeatures": "Fingerprinter(fp_type='morgan', params={'useFeatures':True}, **descriptor_params)", # noqa: E501
"rdkfp": "Fingerprinter(fp_type='rdkfp', **descriptor_params)",
- "rdkfplinear": "Fingerprinter(fp_type='rdkfp', params={'branchedPaths':False}, **descriptor_params)",
+ "rdkfplinear": "Fingerprinter(fp_type='rdkfp', params={'branchedPaths':False}, **descriptor_params)", # noqa: E501
"layered": "Fingerprinter(fp_type='layered', **descriptor_params)",
"atompairs": "Fingerprinter(fp_type='atompairs', **descriptor_params)",
"avalon": "Fingerprinter(fp_type='avalon', **descriptor_params)",
"torsion": "Fingerprinter(fp_type='torsion', **descriptor_params)",
- #'mordred2d': "Mordred2DCalculator(**descriptor_params)",
+ # 'mordred2d': "Mordred2DCalculator(**descriptor_params)",
}
diff --git a/doptools/optimizer/optimizer.py b/doptools/optimizer/optimizer.py
index 0eea8eb..0989985 100644
--- a/doptools/optimizer/optimizer.py
+++ b/doptools/optimizer/optimizer.py
@@ -23,12 +23,13 @@
import os
import warnings
from functools import partial
-from multiprocessing import Manager
+from typing import Any, Dict, MutableMapping, Optional, Tuple, Union
import numpy as np
import optuna
import pandas as pd
from optuna.study import StudyDirection
+from pandas import DataFrame
from scipy.sparse import issparse
from sklearn.datasets import load_svmlight_file
from sklearn.feature_selection import VarianceThreshold
@@ -55,11 +56,11 @@
class TopNPatienceCallback:
- def __init__(self, patience: int, leaders: int = 1):
- self.patience = patience
- self.leaders = leaders
- self._leaders_unchanged_steps = 0
- self._previous_leaders = ()
+ def __init__(self, patience: int, leaders: int = 1) -> None:
+ self.patience: int = patience
+ self.leaders: int = leaders
+ self._leaders_unchanged_steps: int = 0
+ self._previous_leaders: Tuple[int, ...] = ()
def __call__(
self, study: optuna.study.Study, trial: optuna.trial.FrozenTrial
@@ -92,19 +93,20 @@ def __call__(
study.stop()
-def collect_data(datadir, task, fmt="svm"):
- desc_dict = {}
- y = {}
+def collect_data(
+ datadir: str, task: str, fmt: str = "svm"
+) -> Tuple[Dict[str, Any], DataFrame]:
+ desc_dict: Dict[str, Any] = {}
+ y: Dict[str, Any] = {}
for f in glob.glob(os.path.join(datadir, "*." + fmt)):
propname = f.split(os.sep)[-1].split(".")[0]
name = f.split(os.sep)[-1][len(propname) + 1 : -4]
- fullname = f.split(os.sep)[-1]
if fmt == "svm":
desc_dict[name], y[propname] = load_svmlight_file(f)
elif fmt == "csv":
data = pd.read_table(f)
y[propname] = data[propname]
- col_idx = list(data.columns).index()
+ col_idx = list(data.columns).index(propname)
desc_dict[name] = data.iloc[:, col_idx + 1 :]
if task.endswith("C"):
return desc_dict, pd.DataFrame(y, dtype=int)
@@ -112,8 +114,10 @@ def collect_data(datadir, task, fmt="svm"):
return desc_dict, pd.DataFrame(y)
-def calculate_scores(task, obs, pred):
- def create_row(task, stat_name, x, y):
+def calculate_scores(task: str, obs: DataFrame, pred: DataFrame) -> DataFrame:
+ def create_row(
+ task: str, stat_name: str, x: pd.Series, y: pd.Series
+ ) -> Dict[str, Union[str, float]]:
if task == "R":
return {
"stat": stat_name,
@@ -143,6 +147,7 @@ def create_row(task, stat_name, x, y):
"F1": f1_score(x, y, average="macro"),
"MCC": matthews_corrcoef(x, y),
}
+ raise ValueError("Unknown task type")
if task == "R":
score_df = pd.DataFrame(columns=["stat", "R2", "RMSE", "MAE"])
@@ -173,21 +178,21 @@ def create_row(task, stat_name, x, y):
def objective_study(
- storage,
- results_detailed,
- trial,
- x_dict,
- y,
- outdir,
- method,
- ntrials,
- cv_splits,
- cv_repeats,
- jobs,
- tmout,
- earlystop,
+ storage: MutableMapping[int, Dict[str, Any]],
+ results_detailed: MutableMapping[int, Dict[str, Any]],
+ trial: optuna.trial.Trial,
+ x_dict: Dict[str, Any],
+ y: DataFrame,
+ outdir: str,
+ method: str,
+ ntrials: int,
+ cv_splits: int,
+ cv_repeats: int,
+ jobs: int,
+ tmout: int,
+ earlystop: Tuple[int, int],
write_output: bool = True,
-):
+) -> float:
n = trial.number
if write_output and not os.path.exists(os.path.join(outdir, "trial." + str(n))):
os.mkdir(os.path.join(outdir, "trial." + str(n)))
@@ -208,8 +213,6 @@ def objective_study(
X = VarianceThreshold().fit_transform(X)
params = suggest_params(trial, method)
- # storage[n] = {"fit_score":fscore, 'desc': desc, 'scaling': scaling, 'method': method, **params}
-
model = get_raw_model(method, params)
Y = np.array(y[y.columns[0]])
@@ -248,7 +251,7 @@ def objective_study(
score_df = calculate_scores(method[-1], y, res_pd)
- fit_scores = {}
+ fit_scores: Dict[str, Union[str, float]] = {}
model.fit(X, Y)
fit_preds = model.predict(X)
if method.endswith("R"):
@@ -329,21 +332,21 @@ def objective_study(
def run_objective_study_with_timeout(
- storage,
- results_detailed,
- x_dict,
- y,
- outdir,
- method,
- ntrials,
- cv_splits,
- cv_repeats,
- jobs,
- tmout,
- earlystop,
- write_output,
- trial,
-):
+ storage: MutableMapping[int, Dict[str, Any]],
+ results_detailed: MutableMapping[int, Dict[str, Any]],
+ x_dict: Dict[str, Any],
+ y: DataFrame,
+ outdir: str,
+ method: str,
+ ntrials: int,
+ cv_splits: int,
+ cv_repeats: int,
+ jobs: int,
+ tmout: int,
+ earlystop: Tuple[int, int],
+ write_output: bool,
+ trial: optuna.trial.Trial,
+) -> float:
timeouted_objective = timeout_decorator.timeout(
tmout, timeout_exception=optuna.TrialPruned, use_signals=False
)(objective_study)
@@ -366,22 +369,22 @@ def run_objective_study_with_timeout(
def launch_study(
- x_dict,
- y,
- outdir,
- method,
- ntrials,
- cv_splits,
- cv_repeats,
- jobs,
- tmout,
- earlystop,
+ x_dict: Dict[str, Any],
+ y: DataFrame,
+ outdir: str,
+ method: str,
+ ntrials: int,
+ cv_splits: int,
+ cv_repeats: int,
+ jobs: int,
+ tmout: int,
+ earlystop: Tuple[int, int],
write_output: bool = True,
-):
+) -> Optional[Tuple[DataFrame, Dict[int, Any]]]:
ctx = mp.get_context()
with ctx.Manager() as manager:
- results_dict = manager.dict()
- results_detailed = manager.dict()
+ results_dict: MutableMapping[int, Dict[str, Any]] = manager.dict()
+ results_detailed: MutableMapping[int, Dict[str, Any]] = manager.dict()
study = optuna.create_study(
direction="maximize", sampler=optuna.samplers.TPESampler()
@@ -414,25 +417,25 @@ def launch_study(
**kwargs_opt
)
- results_dict = dict(results_dict)
- results_detailed = dict(results_detailed)
+ results_dict_local = dict(results_dict)
+ results_detailed_local = dict(results_detailed)
- hyperparam_names = list(results_dict[next(iter(results_dict))].keys())
+ hyperparam_names = list(results_dict_local[next(iter(results_dict_local))].keys())
results_pd = pd.DataFrame(columns=["trial"] + hyperparam_names + ["score"])
intermediate = study.trials_dataframe(attrs=("number", "value"))
for i, row in intermediate.iterrows():
number = int(row.number)
- if number not in results_dict:
+ if number not in results_dict_local:
continue
added_row = {
"trial": number,
"score": row.value,
- "fit_score": results_dict[number]["fit_score"],
+ "fit_score": results_dict_local[number]["fit_score"],
}
for hp in hyperparam_names:
- added_row[hp] = results_dict[number][hp]
+ added_row[hp] = results_dict_local[number][hp]
results_pd = pd.concat(
[pd.DataFrame([added_row]), results_pd.loc[:]]
@@ -444,7 +447,8 @@ def launch_study(
os.path.join(outdir, "trials.best"), sep=" ", index=False
)
else:
- return results_pd, results_detailed
+ return results_pd, results_detailed_local
+ return None
__all__ = ["calculate_scores", "collect_data", "launch_study"]
diff --git a/doptools/optimizer/preparer.py b/doptools/optimizer/preparer.py
index 2c231be..d2ae87b 100644
--- a/doptools/optimizer/preparer.py
+++ b/doptools/optimizer/preparer.py
@@ -16,12 +16,11 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, see .
-import argparse
-import json
-import multiprocessing as mp
+
import os
import pickle
import warnings
+from typing import Any, Dict, Iterable, List, Tuple
import numpy as np
import pandas as pd
@@ -36,18 +35,18 @@
warnings.simplefilter(action="ignore", category=DeprecationWarning)
-def _set_default(argument, default_values):
+def _set_default(argument: List[Any], default_values: List[Any]) -> List[Any]:
if len(argument) > 0:
return list(set(argument))
else:
return default_values
-def _enumerate_parameters(args):
- def _make_name(iterable):
+def _enumerate_parameters(args: Any) -> Dict[str, Dict[str, Any]]:
+ def _make_name(iterable: Iterable[Any]) -> str:
return "_".join([str(i) for i in iterable])
- param_dict = {}
+ param_dict: Dict[str, Dict[str, Any]] = {}
if args.morgan:
for nb in _set_default(args.morgan_nBits, [1024]):
for mr in _set_default(args.morgan_radius, [2]):
@@ -114,13 +113,15 @@ def _make_name(iterable):
return param_dict
-def _pickle_descriptors(output_dir, fragmentor, prop_name, desc_name):
+def _pickle_descriptors(
+ output_dir: str, fragmentor: Any, prop_name: str, desc_name: str
+) -> None:
fragmentor_name = os.path.join(output_dir, ".".join([prop_name, desc_name, "pkl"]))
with open(fragmentor_name, "wb") as f:
pickle.dump(fragmentor, f, pickle.HIGHEST_PROTOCOL)
-def check_parameters(params):
+def check_parameters(params: Any) -> None:
if not params.input:
raise ValueError("No input file.")
if params.input.split(".")[-1] not in ("csv", "xls", "xlsx"):
@@ -128,18 +129,20 @@ def check_parameters(params):
for i, p in enumerate(params.property_col):
if " " in p and len(params.property_names) < (i + 1):
raise ValueError(
- f"Column name {p} contains spaces in the name.\nPlease provide alternative names with --property_names option."
+ f"Column name {p} contains spaces in the name.\n"
+ "Please provide alternative names with --property_names option."
)
if params.property_names:
if len(params.property_col) != len(params.property_names):
raise ValueError(
- "The number of alternative names is not equal to the number of properties."
+ "The number of alternative names is not equal to the number of "
+ "properties."
)
-def create_input(input_params):
- input_dict = {}
- structures = []
+def create_input(input_params: Dict[str, Any]) -> Dict[str, Any]:
+ input_dict: Dict[str, Any] = {}
+ structures: List[Any] = []
if input_params["input_file"].endswith("csv"):
data_table = pd.read_table(input_params["input_file"], sep=",")
@@ -162,7 +165,7 @@ def create_input(input_params):
for m in structures:
try:
m.canonicalize(fix_tautomers=False)
- except:
+ except Exception:
m.canonicalize(fix_tautomers=False)
input_dict["structures"][col] = structures
# input_dict['structures'] = structures
@@ -178,10 +181,12 @@ def create_input(input_params):
indices = list(y[pd.notnull(y)].index)
if len(indices) < len(structures):
print(
- f"'{p}' column warning: only {len(indices)} out of {len(structures)} instances have the property."
+ f"'{p}' column warning: only {len(indices)} out of "
+ f"{len(structures)} instances have the property."
)
print(
- f"Molecules that don't have the property will be discarded from the set."
+ "Molecules that don't have the property will be discarded from the "
+ "set."
)
y = y.iloc[indices]
y = np.array(y)
@@ -199,9 +204,14 @@ def create_input(input_params):
return input_dict
-def calculate_descriptor_table(input_dict, desc_name, descriptor_params, out="all"):
+def calculate_descriptor_table(
+ input_dict: Dict[str, Any],
+ desc_name: str,
+ descriptor_params: Dict[str, Any],
+ out: str = "all",
+) -> Any:
desc_type = desc_name.split("_")[0]
- result = {"name": desc_name, "type": desc_type}
+ result: Dict[str, Any] = {"name": desc_name, "type": desc_type}
for k, d in input_dict.items():
if k.startswith("prop"):
base_column = list(input_dict["structures"].columns)[0]
@@ -260,7 +270,9 @@ def calculate_descriptor_table(input_dict, desc_name, descriptor_params, out="al
raise ValueError("The return value is not in the result dictionary")
-def output_descriptors(calculated_result, output_params):
+def output_descriptors(
+ calculated_result: Dict[str, Any], output_params: Dict[str, Any]
+) -> None:
desc_name = calculated_result["name"]
desc_type = calculated_result["type"]
@@ -268,9 +280,8 @@ def output_descriptors(calculated_result, output_params):
if output_params["separate"]:
output_folder = os.path.join(output_folder, desc_type)
if not os.path.exists(output_folder):
- os.makedirs(
- output_folder, exist_ok=True
- ) # exist_ok is useful when several processes try to create the folder at the same time
+ os.makedirs(output_folder, exist_ok=True)
+ # exist_ok helps when several processes try to create the folder at once
print("The output directory {} created".format(output_folder))
for k, d in calculated_result.items():
if k.startswith("prop"):
@@ -299,18 +310,19 @@ def output_descriptors(calculated_result, output_params):
)
-def calculate_and_output(input_args):
+def calculate_and_output(
+ input_args: Tuple[Dict[str, Any], str, Dict[str, Any], Dict[str, Any]]
+) -> None:
inpt, desc, descriptor_params, output_params = input_args
result = calculate_descriptor_table(inpt, desc, descriptor_params)
output_descriptors(result, output_params)
-def create_output_dir(outdir):
+def create_output_dir(outdir: str) -> None:
if os.path.exists(outdir):
print(
- "The output directory {} already exists. The data may be overwritten".format(
- outdir
- )
+ "The output directory {} already exists. The data may be "
+ "overwritten".format(outdir)
)
else:
os.makedirs(outdir)
diff --git a/setup.py b/setup.py
index 2571f3d..2c01e2c 100644
--- a/setup.py
+++ b/setup.py
@@ -50,7 +50,10 @@
"openpyxl>=3.1",
"pillow>=11.2.1",
],
- description="A package for calculation of molecular descriptors in Scikit-Learn compatible way and model optimization",
+ description=(
+ "A package for calculation of molecular descriptors in Scikit-Learn "
+ "compatible way and model optimization"
+ ),
long_description=(Path(__file__).parent / "README.rst")
.open(encoding="utf-8")
.read(),
@@ -73,7 +76,8 @@
"Intended Audience :: Developers",
"Topic :: Scientific/Engineering :: Chemistry",
"Topic :: Software Development :: Libraries :: Python Modules",
- "License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)",
+ "License :: OSI Approved :: GNU Lesser General Public License v3 or later "
+ "(LGPLv3+)",
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
diff --git a/tests/chem/test_chem_features.py b/tests/chem/test_chem_features.py
index d63b78e..7e068e7 100644
--- a/tests/chem/test_chem_features.py
+++ b/tests/chem/test_chem_features.py
@@ -1,11 +1,16 @@
"""Tests for chem_features module."""
-from __future__ import annotations
-
import pandas as pd
import pandas.testing as pdt
import pytest
+from doptools.chem.chem_features import (
+ ChythonCircus,
+ ChythonLinear,
+ ComplexFragmentor,
+ Fingerprinter,
+ PassThrough,
+)
from tests.conftest import (
CHEM_CHYLINE_UPPER,
CHEM_CIRCUS_UPPER,
@@ -15,14 +20,6 @@
CHEM_RDKFP_RADIUS,
)
-from doptools.chem.chem_features import (
- ChythonCircus,
- ChythonLinear,
- ComplexFragmentor,
- Fingerprinter,
- PassThrough,
-)
-
@pytest.mark.parametrize("upper", CHEM_CIRCUS_UPPER)
def test_chython_circus_counts_basic(
diff --git a/tests/chem/test_coloratom.py b/tests/chem/test_coloratom.py
index bcad5c5..bf19716 100644
--- a/tests/chem/test_coloratom.py
+++ b/tests/chem/test_coloratom.py
@@ -1,7 +1,5 @@
"""Tests for coloratom helpers."""
-from __future__ import annotations
-
from doptools.chem.coloratom import ColorAtom
diff --git a/tests/chem/test_solvents.py b/tests/chem/test_solvents.py
index ed59534..d07d4e0 100644
--- a/tests/chem/test_solvents.py
+++ b/tests/chem/test_solvents.py
@@ -1,7 +1,5 @@
"""Tests for solvents module."""
-from __future__ import annotations
-
import pandas.testing as pdt
from doptools.chem.solvents import SolventVectorizer
diff --git a/tests/chem/test_utils.py b/tests/chem/test_utils.py
index e553388..d5efc93 100644
--- a/tests/chem/test_utils.py
+++ b/tests/chem/test_utils.py
@@ -1,7 +1,5 @@
"""Tests for chem utils module."""
-from __future__ import annotations
-
from dataclasses import dataclass
import pytest
diff --git a/tests/cli/test_ensemble_model_rebuilding.py b/tests/cli/test_ensemble_model_rebuilding.py
index 8069349..f17e697 100644
--- a/tests/cli/test_ensemble_model_rebuilding.py
+++ b/tests/cli/test_ensemble_model_rebuilding.py
@@ -1,7 +1,5 @@
"""Stub tests for ensemble_model_rebuilding CLI."""
-from __future__ import annotations
-
import pytest
diff --git a/tests/cli/test_launch_optimizer.py b/tests/cli/test_launch_optimizer.py
index c629282..5fece42 100644
--- a/tests/cli/test_launch_optimizer.py
+++ b/tests/cli/test_launch_optimizer.py
@@ -1,10 +1,7 @@
"""Tests for launch_optimizer CLI."""
-from __future__ import annotations
-
-import sys
-
import importlib
+import sys
import pandas as pd
diff --git a/tests/cli/test_launch_preparer.py b/tests/cli/test_launch_preparer.py
index 61dda19..bb8ebdd 100644
--- a/tests/cli/test_launch_preparer.py
+++ b/tests/cli/test_launch_preparer.py
@@ -1,7 +1,5 @@
"""Tests for launch_preparer CLI."""
-from __future__ import annotations
-
import importlib
import sys
@@ -10,7 +8,7 @@
class _DummyPool:
def __init__(self) -> None:
- self.mapped = []
+ self.mapped: list[tuple[object, list[object]]] = []
def map(self, func, iterable):
self.mapped.append((func, list(iterable)))
diff --git a/tests/cli/test_plotter.py b/tests/cli/test_plotter.py
index 9f5ebb4..02dd6bb 100644
--- a/tests/cli/test_plotter.py
+++ b/tests/cli/test_plotter.py
@@ -1,7 +1,5 @@
"""Stub tests for plotter CLI."""
-from __future__ import annotations
-
import pytest
diff --git a/tests/cli/test_rebuilder.py b/tests/cli/test_rebuilder.py
index a14b3bc..bc9e302 100644
--- a/tests/cli/test_rebuilder.py
+++ b/tests/cli/test_rebuilder.py
@@ -1,7 +1,5 @@
"""Stub tests for rebuilder CLI."""
-from __future__ import annotations
-
import pytest
diff --git a/tests/conftest.py b/tests/conftest.py
index 1e61abb..e3b4cd9 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1,12 +1,10 @@
"""Shared pytest fixtures for doptools tests."""
-from __future__ import annotations
-
from pathlib import Path
from typing import Any
import pytest
-import yaml
+import yaml # type: ignore[import-untyped]
_ROOT = Path(__file__).resolve().parent
CHEM_DATA_DIR = _ROOT / "data" / "chem"
diff --git a/tests/data/chem/generate_expected.py b/tests/data/chem/generate_expected.py
index 4bf719b..28eb81d 100644
--- a/tests/data/chem/generate_expected.py
+++ b/tests/data/chem/generate_expected.py
@@ -1,12 +1,10 @@
"""Generate expected descriptor outputs for chem tests."""
-from __future__ import annotations
-
from pathlib import Path
-from typing import Iterable, Any
+from typing import Any, Iterable
import pandas as pd
-import yaml
+import yaml # type: ignore[import-untyped]
from doptools.chem.chem_features import (
ChythonCircus,
@@ -16,7 +14,6 @@
PassThrough,
)
-
ROOT = Path(__file__).resolve().parent
CONFIG_PATH = ROOT / "config.yaml"
@@ -31,8 +28,9 @@ def _write_csv(df: Any | pd.DataFrame, path: Path) -> None:
df.to_csv(path, index=False)
-def _smiles_to_dataframe(smiles: Iterable[str], numeric_values: list[int]
- ) -> pd.DataFrame:
+def _smiles_to_dataframe(
+ smiles: Iterable[str], numeric_values: list[int]
+) -> pd.DataFrame:
return pd.DataFrame({"mol": list(smiles), "num": numeric_values})
@@ -74,9 +72,10 @@ def generate() -> None:
for radius in params["rdkfp_radius"]:
fragmentor = ComplexFragmentor(
associator=[
- ("mol", Fingerprinter(fp_type="rdkfp",
- nBits=n_bits,
- radius=radius)),
+ (
+ "mol",
+ Fingerprinter(fp_type="rdkfp", nBits=n_bits, radius=radius),
+ ),
("numerical", PassThrough(["num"])),
],
structure_columns=["mol"],
diff --git a/tests/estimators/test_ad_estimators.py b/tests/estimators/test_ad_estimators.py
index 3e44835..9128076 100644
--- a/tests/estimators/test_ad_estimators.py
+++ b/tests/estimators/test_ad_estimators.py
@@ -1,8 +1,5 @@
"""Tests for ad_estimators module."""
-from __future__ import annotations
-
-import pandas as pd
import pytest
from sklearn.dummy import DummyRegressor
from sklearn.pipeline import Pipeline
diff --git a/tests/estimators/test_consensus.py b/tests/estimators/test_consensus.py
index 420c577..4baba7a 100644
--- a/tests/estimators/test_consensus.py
+++ b/tests/estimators/test_consensus.py
@@ -1,7 +1,5 @@
"""Tests for consensus module."""
-from __future__ import annotations
-
import pandas as pd
from sklearn.dummy import DummyRegressor
from sklearn.pipeline import Pipeline
diff --git a/tests/optimizer/test_config.py b/tests/optimizer/test_config.py
index bd67dc5..5adcee7 100644
--- a/tests/optimizer/test_config.py
+++ b/tests/optimizer/test_config.py
@@ -1,7 +1,5 @@
"""Tests for optimizer config helpers."""
-from __future__ import annotations
-
from sklearn.svm import SVR
from doptools.chem.chem_features import Fingerprinter
diff --git a/tests/optimizer/test_optimizer.py b/tests/optimizer/test_optimizer.py
index f1e2fc7..0503789 100644
--- a/tests/optimizer/test_optimizer.py
+++ b/tests/optimizer/test_optimizer.py
@@ -1,7 +1,5 @@
"""Tests for optimizer module."""
-from __future__ import annotations
-
import numpy as np
import optuna
import pandas as pd
diff --git a/tests/optimizer/test_preparer.py b/tests/optimizer/test_preparer.py
index 06cf915..0cde911 100644
--- a/tests/optimizer/test_preparer.py
+++ b/tests/optimizer/test_preparer.py
@@ -1,7 +1,5 @@
"""Tests for preparer module."""
-from __future__ import annotations
-
from types import SimpleNamespace
import numpy as np
diff --git a/tests/optimizer/test_utils.py b/tests/optimizer/test_utils.py
index bacb94e..0639905 100644
--- a/tests/optimizer/test_utils.py
+++ b/tests/optimizer/test_utils.py
@@ -1,7 +1,5 @@
"""Tests for optimizer utils."""
-from __future__ import annotations
-
import numpy as np
import pytest