diff --git a/orsopy/slddb/blender.py b/orsopy/slddb/blender.py
new file mode 100644
index 0000000..357c9b7
--- /dev/null
+++ b/orsopy/slddb/blender.py
@@ -0,0 +1,136 @@
+"""
+Pure calculation functions for combining biological sequences and material blends.
+No web-framework dependencies.
+"""
+
+from .dbconfig import DB_FILE
+from .database import SLDDB
+from .material import Material
+from .element_table import get_element
+from .comparators import ExactString
+
+AMINO_ABRV = {
+ "A": "Alanine",
+ "R": "Arginine",
+ "N": "Asparagine",
+ "D": "Aspartate",
+ "B": "Aspartate",
+ "C": "Cysteine",
+ "E": "Glutamate",
+ "Q": "Glutamine",
+ "Z": "Glutamate",
+ "G": "Glycine",
+ "H": "Histidine",
+ "I": "Isoleucine",
+ "L": "Leucine",
+ "K": "Lysine",
+ "M": "Methionine",
+ "F": "Phenylalanine",
+ "P": "Proline",
+ "S": "Serine",
+ "T": "Threonine",
+ "W": "Tryptophan",
+ "Y": "Tyrosine",
+ "V": "Valine",
+}
+
+RNA_ABRV = {
+ "A": "RNA-Adenine",
+ "G": "RNA-Guanine",
+ "C": "RNA-Cytosine",
+ "U": "RNA-Uracil",
+}
+
+DNA_ABRV = {
+ "A": "DNA-Adenine",
+ "G": "DNA-Guanine",
+ "C": "DNA-Cytosine",
+ "T": "DNA-Thymine",
+}
+
+
+class SequenceParseError(ValueError):
+ pass
+
+
+def clean_str(string):
+ return string.replace('\n', '').replace('\r', '').replace('\t', '').replace(' ', '').strip()
+
+
+hx2o = Material([(get_element(element), amount) for element, amount in [('Hx', 2.0), ('O', 1.0)]], dens=1.0)
+
+
+def collect_combination(ids, name_dict):
+ db = SLDDB(DB_FILE)
+ elements: list[Material] = []
+ loaded_ids: dict[str, Material] = {}
+ for id in ids:
+ if id not in loaded_ids:
+ try:
+ entry = db.search_material(name=ExactString(name_dict[id]))[0]
+ except KeyError:
+ possible_ids = name_dict.keys()
+ raise SequenceParseError(f"Not a valid identifier {id}, options are {''.join(possible_ids)}")
+ except IndexError:
+ raise SequenceParseError(f"Molecule {name_dict[id]} not found in database")
+ m = db.select_material(entry)
+ loaded_ids[id] = m
+ elements.append(loaded_ids[id])
+ result = elements[0]
+ for element in elements[1:]:
+ result += element
+ return result
+
+
+def collect_protein(acids):
+ acids = clean_str(acids).upper()
+ result = collect_combination(acids, AMINO_ABRV) + hx2o
+ result.extra_data['description'] = f'protein - {len(acids)} residues'
+ return result
+
+
+def collect_dna(bases):
+ bases = clean_str(bases).upper()
+ result = collect_combination(bases, DNA_ABRV) + hx2o
+ result.extra_data['description'] = f'DNA - {len(bases)} residues'
+ return result
+
+
+def collect_rna(bases):
+ bases = clean_str(bases).upper()
+ result = collect_combination(bases, RNA_ABRV) + hx2o
+ result.extra_data['description'] = f'RNA - {len(bases)} residues'
+ return result
+
+
+def collect_blendIDs(formula):
+ db = SLDDB(DB_FILE)
+ elements: list[Material] = []
+ loaded_ids = {}
+ items = []
+ while '(' in clean_str(formula):
+ pre, formula = formula.split(')', 1)
+ number = float(pre.split('*', 1)[0].strip('(').strip())
+ ID = int(pre.split('*', 1)[1].strip())
+ items.append((number, ID))
+ for number, ID in items:
+ if ID not in loaded_ids:
+ entry = db.search_material(ID=ID)[0]
+ m = db.select_material(entry)
+ loaded_ids[ID] = m
+ elements.append(number * loaded_ids[ID])
+ result = elements[0]
+ for element in elements[1:]:
+ result += element
+ return result
+
+
+def collect_blend(mtype, idstr):
+ if mtype == 'protein':
+ return collect_protein(idstr)
+ elif mtype == 'dna':
+ return collect_dna(idstr)
+ elif mtype == 'rna':
+ return collect_rna(idstr)
+ elif mtype == 'db':
+ return collect_blendIDs(idstr)
diff --git a/orsopy/slddb/tests/test_webapi.py b/orsopy/slddb/tests/test_webapi.py
index aaffc21..ae6ffd6 100644
--- a/orsopy/slddb/tests/test_webapi.py
+++ b/orsopy/slddb/tests/test_webapi.py
@@ -86,7 +86,7 @@ def test_a_downloaddb(self):
if not self.server_available:
return
# make sure the path of the module is correct and that the database has not been downloaded
- self.assertTrue(api.first_access)
+ self.assertTrue(api.update_db)
# self.assertEqual(slddb.__file__, os.path.join(self.path, 'slddb', '__init__.py'))
self.assertFalse(os.path.exists(slddb.DB_FILE))
# test of database download
@@ -115,26 +115,26 @@ def test_a_downloaddb(self):
def test_b_check(self):
if not self.server_available:
return
- api.first_access = True
+ api.update_db = True
if os.path.isfile(slddb.DB_FILE):
os.remove(slddb.DB_FILE)
api.check()
- self.assertFalse(api.first_access)
- api.first_access = True
+ self.assertFalse(api.update_db)
+ api.update_db = True
api.check()
- self.assertFalse(api.first_access)
+ self.assertFalse(api.update_db)
api.check()
# check the update case
api.db.db.close()
del api.db
- api.first_access = True
+ api.update_db = True
api.max_age = -1
api.check()
api.max_age = 1
# check warning if download url doesn't work during update
api.db.db.close()
del api.db
- api.first_access = True
+ api.update_db = True
api.max_age = -1
from orsopy.slddb import dbconfig, webapi
diff --git a/orsopy/slddb/webapi.py b/orsopy/slddb/webapi.py
index 6facd54..22e8f11 100644
--- a/orsopy/slddb/webapi.py
+++ b/orsopy/slddb/webapi.py
@@ -9,7 +9,8 @@
from urllib.error import URLError
from . import DB_FILE, SLDDB
-from .dbconfig import WEBAPI_URL
+from .dbconfig import WEBAPI_URL, DB_MATERIALS_FIELDS, DB_MATERIALS_HIDDEN_DATA, db_lookup
+from .blender import collect_protein, collect_dna, collect_rna
from .element_table import get_element
from .material import Formula, Material
@@ -46,13 +47,13 @@ class SLD_API:
max_age = 1
db: SLDDB = None
- def __init__(self):
- self.first_access = True
- self.use_webquery = True # only try webquery once, if error occurs switch to local database
+ def __init__(self, update_db=True):
+ self.update_db = update_db
+ self.use_webquery = False # default to using local database, which is updated regularly
def check(self):
# make sure the local database file is up to date, if not try to download newest version
- if self.first_access:
+ if self.update_db:
now = datetime.datetime.now()
try:
stat = pathlib.Path(DB_FILE).stat()
@@ -70,7 +71,7 @@ def check(self):
except URLError as err:
warnings.warn("Can't download new version of database; " + str(err))
self.db = SLDDB(DB_FILE) # after potential update, make connection with local database
- self.first_access = False
+ self.update_db = False
else:
return
@@ -95,11 +96,7 @@ def webquery(qdict):
return json.loads(webdata.read()) # return decoded data
def localquery(self, qdict):
- return self.db.search_material(**qdict)
-
- def localmaterial(self, ID):
- res = self.db.search_material(ID=ID)
- return self.db.select_material(res[0])
+ return query_api(qdict)
def search(self, **opts):
"""
@@ -130,29 +127,22 @@ def material(self, ID):
material=api.material(res[0]['ID'])
print(material.dens, material.rho_n, material.f_of_E(8.0))
"""
- if not self.use_webquery:
- return self.localmaterial(ID)
- self.check()
- try:
- res = self.webquery({"ID": int(ID)})
- except URLError:
- self.use_webquery = False
- return self.localmaterial(ID)
- else:
- f = Formula(res["formula"], sort=False)
- mat_data = dict(dens=float(res["density"]), ID=ID, extra_data={})
- if res.get("name", None):
- mat_data["name"] = res["name"]
- if res.get("mu", 0.0):
- mat_data["mu"] = res["mu"]
- elif res.get("M", 0.0):
- mat_data["M"] = res["M"]
- for key in ["ORSO_validated", "description", "doi", "reference"]:
- if key in res:
- mat_data["extra_data"][key] = res[key]
- out = Material([(get_element(element), amount) for element, amount in f], **mat_data)
- return out
+ res = self.search(ID=int(ID))
+
+ f = Formula(res["formula"], sort=False)
+ mat_data = dict(dens=float(res["density"]), ID=ID, extra_data={})
+ if res.get("name", None):
+ mat_data["name"] = res["name"]
+ if res.get("mu", 0.0):
+ mat_data["mu"] = res["mu"]
+ elif res.get("M", 0.0):
+ mat_data["M"] = res["M"]
+ for key in ["ORSO_validated", "description", "doi", "reference"]:
+ if key in res:
+ mat_data["extra_data"][key] = res[key]
+ out = Material([(get_element(element), amount) for element, amount in f], **mat_data)
+ return out
@staticmethod
def custom(formula, dens=None, fu_volume=None, rho_n=None, mu=0.0, xsld=None, xE=None):
@@ -180,7 +170,7 @@ def bio_blender(self, sequence, molecule="protein"):
Get material for protein, DNA or RNA. Provide a letter sequence and molecule type ('protein', 'dna', 'rna').
"""
opts = {molecule.lower(): sequence, "sldcalc": "true"}
- res = self.webquery(opts)
+ res = self.search(**opts)
mat_data = dict(fu_volume=float(res["fu_volume"]), name=f"BioBlender-{molecule.lower()}", extra_data={})
for key in [
"description",
@@ -190,3 +180,113 @@ def bio_blender(self, sequence, molecule="protein"):
out = Material(Formula(res["formula"]), **mat_data)
return out
+
+
+# webquery API functions:
+def calc_api(args):
+ """Calculate SLD from formula/density or biological sequence.
+
+ args: dict-like with optional keys: formula, density, protein, dna, rna,
+ name, material_description, xray_unit.
+ Returns a JSON string.
+ """
+ if 'protein' in args:
+ try:
+ material = collect_protein(args['protein'])
+ except Exception as e:
+ return repr(e)
+ else:
+ name = args.get('name', 'protein')
+ elif 'dna' in args:
+ try:
+ material = collect_dna(args['dna'])
+ except Exception as e:
+ return repr(e)
+ else:
+ name = args.get('name', 'DNA')
+ elif 'rna' in args:
+ try:
+ material = collect_rna(args['rna'])
+ except Exception as e:
+ return repr(e)
+ else:
+ name = args.get('name', 'RNA')
+ elif 'formula' in args and 'density' in args:
+ f = Formula(args['formula'], sort=False)
+ try:
+ material = Material(f, dens=float(args['density']))
+ except Exception as e:
+ return repr(e)
+ else:
+ name = args.get('name', 'User Query')
+ else:
+ return 'Could not calculate, missing formula and density or protein/dna/rna sequence'
+ material.name = name
+ if args.get('material_description', '') != '':
+ material.extra_data['description'] = args['material_description']
+ out = material.export(xray_units=args.get('xray_unit', 'edens'))
+ return out
+
+
+def select_api(args):
+ """Return JSON for a material selected by ID.
+
+ args: dict-like with keys: ID, and optionally xray_unit.
+ Returns a JSON string.
+ """
+ db = SLDDB(DB_FILE)
+ res = db.search_material(filter_invalid=False, ID=int(args['ID']))
+ try:
+ material = db.select_material(res[0])
+ except IndexError:
+ return '## ID not found in database'
+ except Exception as e:
+ return repr(e) + '
' + "Raised when tried to parse material = %s" % res[0]
+ out = material.export(xray_units=args.get('xray_unit', 'edens'))
+ return out
+
+
+def search_api(args):
+ """Search the database with the given field values.
+
+ args: dict-like mapping DB field names to query values.
+ Returns a JSON string.
+ """
+ query = {}
+ for key, value in args.items():
+ if str(value).strip() == '':
+ continue
+ if key in DB_MATERIALS_FIELDS:
+ try:
+ query[key] = db_lookup[key][1].convert(str(value))
+ except Exception as e:
+ return repr(e) + '
' + "Raised when tried to parse %s = %s" % (key, value)
+ db = SLDDB(DB_FILE)
+ res = db.search_material(serializable=True, limit=10000, **query)
+
+ # remove hidden database fields besides ORSO validation
+ for ri in res:
+ for field in DB_MATERIALS_HIDDEN_DATA:
+ if field.startswith('validated'):
+ continue
+ del ri[field]
+
+ return res
+
+
+def query_api(args):
+ """Dispatch an API request based on which keys are present in args.
+
+ args: dict-like (e.g. request.args or a plain dict).
+ Returns a JSON string.
+ """
+ if 'ID' in args:
+ return select_api(args)
+ elif 'sldcalc' in args:
+ return calc_api(args)
+ elif 'get_fields' in args:
+ return [
+ field for field in DB_MATERIALS_FIELDS if field not in DB_MATERIALS_HIDDEN_DATA
+ ]
+ else:
+ return search_api(args)