Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions retropath2_wrapper/Args.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
)
DEFAULTS = {
'MSC_TIMEOUT': 10, # minutes
'RP2_VERSION': 'r20250728',
'RP2_VERSION': 'r20260119',
'KNIME_FOLDER': __PACKAGE_FOLDER,
"STD_HYDROGEN": "auto", # How hydrogens are represented in chemical rules
"SCORE_MODE": "auto", # How to consider priorize rules according to their score
}
RETCODES = {
'OK': 0,
Expand Down Expand Up @@ -95,7 +96,7 @@ def _add_arguments(parser):
'--rp2_version',
type=str,
default=DEFAULTS['RP2_VERSION'],
choices=['v9', 'r20210127', 'r20220104', "r20220224", "r20250728"],
choices=['v9', 'r20210127', 'r20220104', "r20220224", "r20250728", "r20260119"],
help=f'version of RetroPath2.0 workflow (default: {DEFAULTS["RP2_VERSION"]}).'
)

Expand All @@ -116,6 +117,17 @@ def _add_arguments(parser):
choices=["auto", "implicit", "explicit"],
help="How hydrogens are represented in chemical rules, auto mode will try to guess from the chemical rules",
)
parser_rp.add_argument(
"--score_mode",
default=DEFAULTS["SCORE_MODE"],
choices=["auto", "maximize", "minimize"],
help=(
"How to consider priorize rules according to their score:"
" 'maximise' (higher is better), 'minimize' (lower is better),"
" 'auto' (will try to guess from the chemical rules)."
f" Default: '{DEFAULTS['SCORE_MODE']}'"
)
)

# Program options
parser_sp = parser.add_argument_group("Logging")
Expand Down
143 changes: 141 additions & 2 deletions retropath2_wrapper/RetroPath2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
@description: Python wrapper to run RetroPath2.0 KNIME workflow

"""
import csv
import gzip
import tarfile
import zipfile
Expand Down Expand Up @@ -49,6 +50,7 @@ def retropath2(
rules_file: str,
outdir: str,
std_hydrogen: str,
score_mode: str,
knime: Knime | None,
rp2_version: str | None = DEFAULTS['RP2_VERSION'],
max_steps: int = 3,
Expand All @@ -65,6 +67,7 @@ def retropath2(
logger.debug(f'rules_file: {rules_file}')
logger.debug(f'outdir: {outdir}')
logger.debug(f'std_hydrogen: {std_hydrogen}')
logger.debug(f'score_mode: {score_mode}')
logger.debug(f'rp2_version: {rp2_version}')
logger.debug(f'max_steps: {max_steps}')
logger.debug(f'topx: {topx}')
Expand Down Expand Up @@ -102,6 +105,7 @@ def retropath2(
'dmax' : dmax,
'mwmax_source' : mwmax_source,
'std_hydrogen' : std_hydrogen,
'score_mode' : score_mode,
}
logger.debug('rp2_params: ' + str(rp2_params))

Expand Down Expand Up @@ -372,8 +376,13 @@ def format_files_for_knime(
}
# Because KNIME accepts only '.csv' file extension,
# files have to be renamed
allowed_extensions = {
'sink' : ['.csv'],
'source' : ['.csv'],
'rules' : ['.csv', '.tsv'],
}
for key in ['sink', 'source', 'rules']:
if os_path.splitext(files[key])[-1] != '.csv':
if os_path.splitext(files[key])[-1] not in allowed_extensions[key]:
new_f = os_path.join(
indir,
os_path.basename(files[key])+'.csv'
Expand All @@ -383,6 +392,136 @@ def format_files_for_knime(

return files

# Function to return the first lines of a file (as a list)
def get_first_lines(path: str, n: int = 10) -> list[str]:
"""
Return the first n lines of a file.

Parameters
----------
path : str
Path of the file.
n : int
Number of lines to return. If n <= 0, return all lines.

Returns
-------
list[str]
The first n lines of the file.
"""
lines = []
# --- Gzip compressed single file ---
if path.endswith(".gz") and not path.endswith(".tar.gz"):
with gzip.open(path, "rt", encoding="utf-8", errors="ignore") as f:
for i, line in enumerate(f):
if n > 0 and i >= n:
break
lines.append(line.rstrip())
# --- Tar or Tar.gz archive ---
elif path.endswith(".tar") or path.endswith(".tar.gz"):
mode = "r:gz" if path.endswith(".gz") else "r:"
with tarfile.open(path, mode) as tar:
# Pick the first regular file inside
for member in tar:
if member.isfile():
f = tar.extractfile(member)
if f is None:
continue
for i, line in enumerate(f):
if n > 0 and i >= n:
break
lines.append(line.decode("utf-8", errors="ignore").rstrip())
break # only first file
# --- Zip archive ---
elif path.endswith(".zip"):
with zipfile.ZipFile(path, "r") as zf:
for name in zf.namelist():
if name.startswith("_"):
continue
with zf.open(name) as f:
for i, line in enumerate(f):
if n > 0 and i >= n:
break
lines.append(line.decode("utf-8", errors="ignore").rstrip())
break
# --- Plain text ---
else:
with open(path, "rt", encoding="utf-8", errors="ignore") as f:
for i, line in enumerate(f):
if n > 0 and i >= n:
break
lines.append(line.rstrip())
return lines


def sniff_score_mode(path: str, default_mode: str = "maximize", logger: Logger = getLogger(__name__)) -> str: # noqa: E501
"""
Sniff the scoring mode used in reaction rules.

Parameters
----------
path : str
Path of the file.
default_mode : str
Default scoring mode.
logger : Logger
The logger object.

Returns
-------
str
The scoring mode: 'minimize' or 'maximize'.

"""
lines = get_first_lines(path, n=-1)

dialect = csv.Sniffer().sniff("\n".join(lines[:10]))

# Check if header is present
has_header = csv.Sniffer().has_header("\n".join(lines[:10]))
if not has_header:
logger.info(
"No header detected in reaction rules."
" Returning scoring mode: %s.", default_mode
)
return default_mode

# Look for 'score' column
lines[0] = lines[0].lower()
if "score" not in lines[0]:
logger.info(
"No 'score' column detected in reaction rules."
" Returning scoring mode: %s.", default_mode
)
return default_mode

# Inspect all score values
scores = []
for line in csv.DictReader(lines, dialect=dialect):
try:
scores.append(float(line["score"]))
except ValueError:
pass
if len(scores) == 0:
logger.info(
"No valid score values detected in reaction rules."
" Returning scoring mode: %s.", default_mode
)
return default_mode
elif any(s > 1 for s in scores):
logger.info(
"Score > 1 detected, which is historically associated"
" with 'minimize' mode. Returning 'minimize'."
)
return "minimize"
else:
logger.info(
"All scores <= 1 detected, which is historically associated"
" with 'maximize' mode. Returning 'maximize'."
)
return "maximize"


def sniff_rules(path: str, logger: Logger = getLogger(__name__)) -> str:
hydrogen_explicit_patterns = ["[#1"]
n = 10
Expand Down Expand Up @@ -435,4 +574,4 @@ def sniff_rules(path: str, logger: Logger = getLogger(__name__)) -> str:
logger.info("Detect explicit hydrogens in reaction rules")
return "explicit"
logger.info("Detect implicit hydrogen in reaction rules")
return "implicit"
return "implicit"
10 changes: 10 additions & 0 deletions retropath2_wrapper/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from retropath2_wrapper.RetroPath2 import (
retropath2,
sniff_rules,
sniff_score_mode,
)
from retropath2_wrapper.Args import (
build_args_parser,
Expand Down Expand Up @@ -102,12 +103,21 @@ def _cli():
else:
std_hydrogen = "H added + Aromatized"

# Sniff scoring mode
if args.score_mode == "auto":
score_mode = sniff_score_mode(path=args.rules_file, logger=logger)
elif args.score_mode in ["maximize", "minimize"]:
score_mode = args.score_mode
else:
parser.error("--score_mode should be one of 'auto', 'maximize' or 'minimize'.")

r_code, result_files = retropath2(
sink_file=args.sink_file,
source_file=args.source_file,
rules_file=args.rules_file,
outdir=args.outdir,
std_hydrogen=std_hydrogen,
score_mode=score_mode,
max_steps=args.max_steps,
topx=args.topx,
dmin=args.dmin,
Expand Down
1 change: 1 addition & 0 deletions retropath2_wrapper/knime.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ def call(
args += ['-workflow.variable=output.solutionfile,"%s",String' % (self.standardize_path(files['results']),)]
args += ['-workflow.variable=output.sourceinsinkfile,"%s",String' % (self.standardize_path(files['src-in-sk']),)]
args += ['-workflow.variable=input.std_mode,"%s",String' % (params["std_hydrogen"],)]
args += ['-workflow.variable=input.score_mode,"%s",String' % (params["score_mode"],)]
if preference and preference.is_init():
preference.to_file()
args += ["-preferences=" + self.standardize_path(preference.path)]
Expand Down
Binary file not shown.
Loading