diff --git a/orsopy/slddb/blender.py b/orsopy/slddb/blender.py new file mode 100644 index 0000000..357c9b7 --- /dev/null +++ b/orsopy/slddb/blender.py @@ -0,0 +1,136 @@ +""" +Pure calculation functions for combining biological sequences and material blends. +No web-framework dependencies. +""" + +from .dbconfig import DB_FILE +from .database import SLDDB +from .material import Material +from .element_table import get_element +from .comparators import ExactString + +AMINO_ABRV = { + "A": "Alanine", + "R": "Arginine", + "N": "Asparagine", + "D": "Aspartate", + "B": "Aspartate", + "C": "Cysteine", + "E": "Glutamate", + "Q": "Glutamine", + "Z": "Glutamate", + "G": "Glycine", + "H": "Histidine", + "I": "Isoleucine", + "L": "Leucine", + "K": "Lysine", + "M": "Methionine", + "F": "Phenylalanine", + "P": "Proline", + "S": "Serine", + "T": "Threonine", + "W": "Tryptophan", + "Y": "Tyrosine", + "V": "Valine", +} + +RNA_ABRV = { + "A": "RNA-Adenine", + "G": "RNA-Guanine", + "C": "RNA-Cytosine", + "U": "RNA-Uracil", +} + +DNA_ABRV = { + "A": "DNA-Adenine", + "G": "DNA-Guanine", + "C": "DNA-Cytosine", + "T": "DNA-Thymine", +} + + +class SequenceParseError(ValueError): + pass + + +def clean_str(string): + return string.replace('\n', '').replace('\r', '').replace('\t', '').replace(' ', '').strip() + + +hx2o = Material([(get_element(element), amount) for element, amount in [('Hx', 2.0), ('O', 1.0)]], dens=1.0) + + +def collect_combination(ids, name_dict): + db = SLDDB(DB_FILE) + elements: list[Material] = [] + loaded_ids: dict[str, Material] = {} + for id in ids: + if id not in loaded_ids: + try: + entry = db.search_material(name=ExactString(name_dict[id]))[0] + except KeyError: + possible_ids = name_dict.keys() + raise SequenceParseError(f"Not a valid identifier {id}, options are {''.join(possible_ids)}") + except IndexError: + raise SequenceParseError(f"Molecule {name_dict[id]} not found in database") + m = db.select_material(entry) + loaded_ids[id] = m + elements.append(loaded_ids[id]) + result = elements[0] + for element in elements[1:]: + result += element + return result + + +def collect_protein(acids): + acids = clean_str(acids).upper() + result = collect_combination(acids, AMINO_ABRV) + hx2o + result.extra_data['description'] = f'protein - {len(acids)} residues' + return result + + +def collect_dna(bases): + bases = clean_str(bases).upper() + result = collect_combination(bases, DNA_ABRV) + hx2o + result.extra_data['description'] = f'DNA - {len(bases)} residues' + return result + + +def collect_rna(bases): + bases = clean_str(bases).upper() + result = collect_combination(bases, RNA_ABRV) + hx2o + result.extra_data['description'] = f'RNA - {len(bases)} residues' + return result + + +def collect_blendIDs(formula): + db = SLDDB(DB_FILE) + elements: list[Material] = [] + loaded_ids = {} + items = [] + while '(' in clean_str(formula): + pre, formula = formula.split(')', 1) + number = float(pre.split('*', 1)[0].strip('(').strip()) + ID = int(pre.split('*', 1)[1].strip()) + items.append((number, ID)) + for number, ID in items: + if ID not in loaded_ids: + entry = db.search_material(ID=ID)[0] + m = db.select_material(entry) + loaded_ids[ID] = m + elements.append(number * loaded_ids[ID]) + result = elements[0] + for element in elements[1:]: + result += element + return result + + +def collect_blend(mtype, idstr): + if mtype == 'protein': + return collect_protein(idstr) + elif mtype == 'dna': + return collect_dna(idstr) + elif mtype == 'rna': + return collect_rna(idstr) + elif mtype == 'db': + return collect_blendIDs(idstr) diff --git a/orsopy/slddb/tests/test_webapi.py b/orsopy/slddb/tests/test_webapi.py index aaffc21..ae6ffd6 100644 --- a/orsopy/slddb/tests/test_webapi.py +++ b/orsopy/slddb/tests/test_webapi.py @@ -86,7 +86,7 @@ def test_a_downloaddb(self): if not self.server_available: return # make sure the path of the module is correct and that the database has not been downloaded - self.assertTrue(api.first_access) + self.assertTrue(api.update_db) # self.assertEqual(slddb.__file__, os.path.join(self.path, 'slddb', '__init__.py')) self.assertFalse(os.path.exists(slddb.DB_FILE)) # test of database download @@ -115,26 +115,26 @@ def test_a_downloaddb(self): def test_b_check(self): if not self.server_available: return - api.first_access = True + api.update_db = True if os.path.isfile(slddb.DB_FILE): os.remove(slddb.DB_FILE) api.check() - self.assertFalse(api.first_access) - api.first_access = True + self.assertFalse(api.update_db) + api.update_db = True api.check() - self.assertFalse(api.first_access) + self.assertFalse(api.update_db) api.check() # check the update case api.db.db.close() del api.db - api.first_access = True + api.update_db = True api.max_age = -1 api.check() api.max_age = 1 # check warning if download url doesn't work during update api.db.db.close() del api.db - api.first_access = True + api.update_db = True api.max_age = -1 from orsopy.slddb import dbconfig, webapi diff --git a/orsopy/slddb/webapi.py b/orsopy/slddb/webapi.py index 6facd54..22e8f11 100644 --- a/orsopy/slddb/webapi.py +++ b/orsopy/slddb/webapi.py @@ -9,7 +9,8 @@ from urllib.error import URLError from . import DB_FILE, SLDDB -from .dbconfig import WEBAPI_URL +from .dbconfig import WEBAPI_URL, DB_MATERIALS_FIELDS, DB_MATERIALS_HIDDEN_DATA, db_lookup +from .blender import collect_protein, collect_dna, collect_rna from .element_table import get_element from .material import Formula, Material @@ -46,13 +47,13 @@ class SLD_API: max_age = 1 db: SLDDB = None - def __init__(self): - self.first_access = True - self.use_webquery = True # only try webquery once, if error occurs switch to local database + def __init__(self, update_db=True): + self.update_db = update_db + self.use_webquery = False # default to using local database, which is updated regularly def check(self): # make sure the local database file is up to date, if not try to download newest version - if self.first_access: + if self.update_db: now = datetime.datetime.now() try: stat = pathlib.Path(DB_FILE).stat() @@ -70,7 +71,7 @@ def check(self): except URLError as err: warnings.warn("Can't download new version of database; " + str(err)) self.db = SLDDB(DB_FILE) # after potential update, make connection with local database - self.first_access = False + self.update_db = False else: return @@ -95,11 +96,7 @@ def webquery(qdict): return json.loads(webdata.read()) # return decoded data def localquery(self, qdict): - return self.db.search_material(**qdict) - - def localmaterial(self, ID): - res = self.db.search_material(ID=ID) - return self.db.select_material(res[0]) + return query_api(qdict) def search(self, **opts): """ @@ -130,29 +127,22 @@ def material(self, ID): material=api.material(res[0]['ID']) print(material.dens, material.rho_n, material.f_of_E(8.0)) """ - if not self.use_webquery: - return self.localmaterial(ID) - self.check() - try: - res = self.webquery({"ID": int(ID)}) - except URLError: - self.use_webquery = False - return self.localmaterial(ID) - else: - f = Formula(res["formula"], sort=False) - mat_data = dict(dens=float(res["density"]), ID=ID, extra_data={}) - if res.get("name", None): - mat_data["name"] = res["name"] - if res.get("mu", 0.0): - mat_data["mu"] = res["mu"] - elif res.get("M", 0.0): - mat_data["M"] = res["M"] - for key in ["ORSO_validated", "description", "doi", "reference"]: - if key in res: - mat_data["extra_data"][key] = res[key] - out = Material([(get_element(element), amount) for element, amount in f], **mat_data) - return out + res = self.search(ID=int(ID)) + + f = Formula(res["formula"], sort=False) + mat_data = dict(dens=float(res["density"]), ID=ID, extra_data={}) + if res.get("name", None): + mat_data["name"] = res["name"] + if res.get("mu", 0.0): + mat_data["mu"] = res["mu"] + elif res.get("M", 0.0): + mat_data["M"] = res["M"] + for key in ["ORSO_validated", "description", "doi", "reference"]: + if key in res: + mat_data["extra_data"][key] = res[key] + out = Material([(get_element(element), amount) for element, amount in f], **mat_data) + return out @staticmethod def custom(formula, dens=None, fu_volume=None, rho_n=None, mu=0.0, xsld=None, xE=None): @@ -180,7 +170,7 @@ def bio_blender(self, sequence, molecule="protein"): Get material for protein, DNA or RNA. Provide a letter sequence and molecule type ('protein', 'dna', 'rna'). """ opts = {molecule.lower(): sequence, "sldcalc": "true"} - res = self.webquery(opts) + res = self.search(**opts) mat_data = dict(fu_volume=float(res["fu_volume"]), name=f"BioBlender-{molecule.lower()}", extra_data={}) for key in [ "description", @@ -190,3 +180,113 @@ def bio_blender(self, sequence, molecule="protein"): out = Material(Formula(res["formula"]), **mat_data) return out + + +# webquery API functions: +def calc_api(args): + """Calculate SLD from formula/density or biological sequence. + + args: dict-like with optional keys: formula, density, protein, dna, rna, + name, material_description, xray_unit. + Returns a JSON string. + """ + if 'protein' in args: + try: + material = collect_protein(args['protein']) + except Exception as e: + return repr(e) + else: + name = args.get('name', 'protein') + elif 'dna' in args: + try: + material = collect_dna(args['dna']) + except Exception as e: + return repr(e) + else: + name = args.get('name', 'DNA') + elif 'rna' in args: + try: + material = collect_rna(args['rna']) + except Exception as e: + return repr(e) + else: + name = args.get('name', 'RNA') + elif 'formula' in args and 'density' in args: + f = Formula(args['formula'], sort=False) + try: + material = Material(f, dens=float(args['density'])) + except Exception as e: + return repr(e) + else: + name = args.get('name', 'User Query') + else: + return 'Could not calculate, missing formula and density or protein/dna/rna sequence' + material.name = name + if args.get('material_description', '') != '': + material.extra_data['description'] = args['material_description'] + out = material.export(xray_units=args.get('xray_unit', 'edens')) + return out + + +def select_api(args): + """Return JSON for a material selected by ID. + + args: dict-like with keys: ID, and optionally xray_unit. + Returns a JSON string. + """ + db = SLDDB(DB_FILE) + res = db.search_material(filter_invalid=False, ID=int(args['ID'])) + try: + material = db.select_material(res[0]) + except IndexError: + return '## ID not found in database' + except Exception as e: + return repr(e) + '
' + "Raised when tried to parse material = %s" % res[0] + out = material.export(xray_units=args.get('xray_unit', 'edens')) + return out + + +def search_api(args): + """Search the database with the given field values. + + args: dict-like mapping DB field names to query values. + Returns a JSON string. + """ + query = {} + for key, value in args.items(): + if str(value).strip() == '': + continue + if key in DB_MATERIALS_FIELDS: + try: + query[key] = db_lookup[key][1].convert(str(value)) + except Exception as e: + return repr(e) + '
' + "Raised when tried to parse %s = %s" % (key, value) + db = SLDDB(DB_FILE) + res = db.search_material(serializable=True, limit=10000, **query) + + # remove hidden database fields besides ORSO validation + for ri in res: + for field in DB_MATERIALS_HIDDEN_DATA: + if field.startswith('validated'): + continue + del ri[field] + + return res + + +def query_api(args): + """Dispatch an API request based on which keys are present in args. + + args: dict-like (e.g. request.args or a plain dict). + Returns a JSON string. + """ + if 'ID' in args: + return select_api(args) + elif 'sldcalc' in args: + return calc_api(args) + elif 'get_fields' in args: + return [ + field for field in DB_MATERIALS_FIELDS if field not in DB_MATERIALS_HIDDEN_DATA + ] + else: + return search_api(args)