Skip to content
Open
136 changes: 136 additions & 0 deletions orsopy/slddb/blender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""
Pure calculation functions for combining biological sequences and material blends.
No web-framework dependencies.
"""

from .dbconfig import DB_FILE
from .database import SLDDB
from .material import Material
from .element_table import get_element
from .comparators import ExactString

AMINO_ABRV = {
"A": "Alanine",
"R": "Arginine",
"N": "Asparagine",
"D": "Aspartate",
"B": "Aspartate",
"C": "Cysteine",
"E": "Glutamate",
"Q": "Glutamine",
"Z": "Glutamate",
"G": "Glycine",
"H": "Histidine",
"I": "Isoleucine",
"L": "Leucine",
"K": "Lysine",
"M": "Methionine",
"F": "Phenylalanine",
"P": "Proline",
"S": "Serine",
"T": "Threonine",
"W": "Tryptophan",
"Y": "Tyrosine",
"V": "Valine",
}

RNA_ABRV = {
"A": "RNA-Adenine",
"G": "RNA-Guanine",
"C": "RNA-Cytosine",
"U": "RNA-Uracil",
}

DNA_ABRV = {
"A": "DNA-Adenine",
"G": "DNA-Guanine",
"C": "DNA-Cytosine",
"T": "DNA-Thymine",
}


class SequenceParseError(ValueError):
pass


def clean_str(string):
return string.replace('\n', '').replace('\r', '').replace('\t', '').replace(' ', '').strip()


hx2o = Material([(get_element(element), amount) for element, amount in [('Hx', 2.0), ('O', 1.0)]], dens=1.0)


def collect_combination(ids, name_dict):
db = SLDDB(DB_FILE)
elements: list[Material] = []
loaded_ids: dict[str, Material] = {}
for id in ids:
if id not in loaded_ids:
try:
entry = db.search_material(name=ExactString(name_dict[id]))[0]
except KeyError:
possible_ids = name_dict.keys()
raise SequenceParseError(f"Not a valid identifier {id}, options are {''.join(possible_ids)}")
except IndexError:
raise SequenceParseError(f"Molecule {name_dict[id]} not found in database")
m = db.select_material(entry)
loaded_ids[id] = m
elements.append(loaded_ids[id])
result = elements[0]
for element in elements[1:]:
result += element
return result


def collect_protein(acids):
acids = clean_str(acids).upper()
result = collect_combination(acids, AMINO_ABRV) + hx2o
result.extra_data['description'] = f'protein - {len(acids)} residues'
return result


def collect_dna(bases):
bases = clean_str(bases).upper()
result = collect_combination(bases, DNA_ABRV) + hx2o
result.extra_data['description'] = f'DNA - {len(bases)} residues'
return result


def collect_rna(bases):
bases = clean_str(bases).upper()
result = collect_combination(bases, RNA_ABRV) + hx2o
result.extra_data['description'] = f'RNA - {len(bases)} residues'
return result


def collect_blendIDs(formula):
db = SLDDB(DB_FILE)
elements: list[Material] = []
loaded_ids = {}
items = []
while '(' in clean_str(formula):
pre, formula = formula.split(')', 1)
number = float(pre.split('*', 1)[0].strip('(').strip())
ID = int(pre.split('*', 1)[1].strip())
items.append((number, ID))
for number, ID in items:
if ID not in loaded_ids:
entry = db.search_material(ID=ID)[0]
m = db.select_material(entry)
loaded_ids[ID] = m
elements.append(number * loaded_ids[ID])
result = elements[0]

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this be simply return sum(elements) ?

for element in elements[1:]:
result += element
return result


Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose to do this simply by using a dictionary of collect functions and .get:

COLLECT_FUNCTIONS = {
  "protein": collect_protein,
  "dna": collect_dna,
  "rna": collect_rna,
  "db": collect_blendIDs,
}

def collect_blend(mtype, idstr):
if mtype == 'protein':
return collect_protein(idstr)
elif mtype == 'dna':
return collect_dna(idstr)
elif mtype == 'rna':
return collect_rna(idstr)
elif mtype == 'db':
return collect_blendIDs(idstr)
14 changes: 7 additions & 7 deletions orsopy/slddb/tests/test_webapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def test_a_downloaddb(self):
if not self.server_available:
return
# make sure the path of the module is correct and that the database has not been downloaded
self.assertTrue(api.first_access)
self.assertTrue(api.update_db)
# self.assertEqual(slddb.__file__, os.path.join(self.path, 'slddb', '__init__.py'))
self.assertFalse(os.path.exists(slddb.DB_FILE))
# test of database download
Expand Down Expand Up @@ -115,26 +115,26 @@ def test_a_downloaddb(self):
def test_b_check(self):
if not self.server_available:
return
api.first_access = True
api.update_db = True
if os.path.isfile(slddb.DB_FILE):
os.remove(slddb.DB_FILE)
api.check()
self.assertFalse(api.first_access)
api.first_access = True
self.assertFalse(api.update_db)
api.update_db = True
api.check()
self.assertFalse(api.first_access)
self.assertFalse(api.update_db)
api.check()
# check the update case
api.db.db.close()
del api.db
api.first_access = True
api.update_db = True
api.max_age = -1
api.check()
api.max_age = 1
# check warning if download url doesn't work during update
api.db.db.close()
del api.db
api.first_access = True
api.update_db = True
api.max_age = -1

from orsopy.slddb import dbconfig, webapi
Expand Down
168 changes: 134 additions & 34 deletions orsopy/slddb/webapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from urllib.error import URLError

from . import DB_FILE, SLDDB
from .dbconfig import WEBAPI_URL
from .dbconfig import WEBAPI_URL, DB_MATERIALS_FIELDS, DB_MATERIALS_HIDDEN_DATA, db_lookup
from .blender import collect_protein, collect_dna, collect_rna
from .element_table import get_element
from .material import Formula, Material

Expand Down Expand Up @@ -46,13 +47,13 @@ class SLD_API:
max_age = 1
db: SLDDB = None

def __init__(self):
self.first_access = True
self.use_webquery = True # only try webquery once, if error occurs switch to local database

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making the default just a DB download will break the SLDdb statistical functions that are used to improve search results. (I.e. return most searched and most selected materials, first.)

def __init__(self, update_db=True):
self.update_db = update_db
self.use_webquery = False # default to using local database, which is updated regularly

def check(self):
# make sure the local database file is up to date, if not try to download newest version
if self.first_access:
if self.update_db:
now = datetime.datetime.now()
try:
stat = pathlib.Path(DB_FILE).stat()
Expand All @@ -70,7 +71,7 @@ def check(self):
except URLError as err:
warnings.warn("Can't download new version of database; " + str(err))
self.db = SLDDB(DB_FILE) # after potential update, make connection with local database
self.first_access = False
self.update_db = False
else:
return

Expand All @@ -95,11 +96,7 @@ def webquery(qdict):
return json.loads(webdata.read()) # return decoded data

def localquery(self, qdict):
return self.db.search_material(**qdict)

def localmaterial(self, ID):
res = self.db.search_material(ID=ID)
return self.db.select_material(res[0])
return query_api(qdict)

def search(self, **opts):
"""
Expand Down Expand Up @@ -130,29 +127,22 @@ def material(self, ID):
material=api.material(res[0]['ID'])
print(material.dens, material.rho_n, material.f_of_E(8.0))
"""
if not self.use_webquery:
return self.localmaterial(ID)

self.check()
try:
res = self.webquery({"ID": int(ID)})
except URLError:
self.use_webquery = False
return self.localmaterial(ID)
else:
f = Formula(res["formula"], sort=False)
mat_data = dict(dens=float(res["density"]), ID=ID, extra_data={})
if res.get("name", None):
mat_data["name"] = res["name"]
if res.get("mu", 0.0):
mat_data["mu"] = res["mu"]
elif res.get("M", 0.0):
mat_data["M"] = res["M"]
for key in ["ORSO_validated", "description", "doi", "reference"]:
if key in res:
mat_data["extra_data"][key] = res[key]
out = Material([(get_element(element), amount) for element, amount in f], **mat_data)
return out
res = self.search(ID=int(ID))

f = Formula(res["formula"], sort=False)
mat_data = dict(dens=float(res["density"]), ID=ID, extra_data={})
if res.get("name", None):
mat_data["name"] = res["name"]
if res.get("mu", 0.0):
mat_data["mu"] = res["mu"]
elif res.get("M", 0.0):
mat_data["M"] = res["M"]
for key in ["ORSO_validated", "description", "doi", "reference"]:
if key in res:
mat_data["extra_data"][key] = res[key]
out = Material([(get_element(element), amount) for element, amount in f], **mat_data)
return out

@staticmethod
def custom(formula, dens=None, fu_volume=None, rho_n=None, mu=0.0, xsld=None, xE=None):
Expand Down Expand Up @@ -180,7 +170,7 @@ def bio_blender(self, sequence, molecule="protein"):
Get material for protein, DNA or RNA. Provide a letter sequence and molecule type ('protein', 'dna', 'rna').
"""
opts = {molecule.lower(): sequence, "sldcalc": "true"}
res = self.webquery(opts)
res = self.search(**opts)
mat_data = dict(fu_volume=float(res["fu_volume"]), name=f"BioBlender-{molecule.lower()}", extra_data={})
for key in [
"description",
Expand All @@ -190,3 +180,113 @@ def bio_blender(self, sequence, molecule="protein"):

out = Material(Formula(res["formula"]), **mat_data)
return out


# webquery API functions:
def calc_api(args):
"""Calculate SLD from formula/density or biological sequence.

args: dict-like with optional keys: formula, density, protein, dna, rna,
name, material_description, xray_unit.
Returns a JSON string.
"""
if 'protein' in args:
try:
material = collect_protein(args['protein'])
except Exception as e:
return repr(e)
else:
name = args.get('name', 'protein')
elif 'dna' in args:
try:
material = collect_dna(args['dna'])
except Exception as e:
return repr(e)
else:
name = args.get('name', 'DNA')
elif 'rna' in args:
try:
material = collect_rna(args['rna'])
except Exception as e:
return repr(e)
else:
name = args.get('name', 'RNA')
elif 'formula' in args and 'density' in args:
f = Formula(args['formula'], sort=False)
try:
material = Material(f, dens=float(args['density']))
except Exception as e:
return repr(e)
else:
name = args.get('name', 'User Query')
else:
return 'Could not calculate, missing formula and density or protein/dna/rna sequence'
material.name = name
if args.get('material_description', '') != '':
material.extra_data['description'] = args['material_description']
out = material.export(xray_units=args.get('xray_unit', 'edens'))
return out


def select_api(args):
"""Return JSON for a material selected by ID.

args: dict-like with keys: ID, and optionally xray_unit.
Returns a JSON string.
"""
db = SLDDB(DB_FILE)
res = db.search_material(filter_invalid=False, ID=int(args['ID']))
try:
material = db.select_material(res[0])
except IndexError:
return '## ID not found in database'
except Exception as e:
return repr(e) + '<br >' + "Raised when tried to parse material = %s" % res[0]
out = material.export(xray_units=args.get('xray_unit', 'edens'))
return out


def search_api(args):
"""Search the database with the given field values.

args: dict-like mapping DB field names to query values.
Returns a JSON string.
"""
query = {}
for key, value in args.items():
if str(value).strip() == '':
continue
if key in DB_MATERIALS_FIELDS:
try:
query[key] = db_lookup[key][1].convert(str(value))
except Exception as e:
return repr(e) + '<br >' + "Raised when tried to parse %s = %s" % (key, value)
db = SLDDB(DB_FILE)
res = db.search_material(serializable=True, limit=10000, **query)

# remove hidden database fields besides ORSO validation
for ri in res:
for field in DB_MATERIALS_HIDDEN_DATA:
if field.startswith('validated'):
continue
del ri[field]

return res


def query_api(args):
"""Dispatch an API request based on which keys are present in args.

args: dict-like (e.g. request.args or a plain dict).
Returns a JSON string.
"""
if 'ID' in args:
return select_api(args)
elif 'sldcalc' in args:
return calc_api(args)
elif 'get_fields' in args:
return [
field for field in DB_MATERIALS_FIELDS if field not in DB_MATERIALS_HIDDEN_DATA
]
else:
return search_api(args)
Loading