-
Notifications
You must be signed in to change notification settings - Fork 8
Unified query api #158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Unified query api #158
Changes from all commits
86f7ecc
5c32376
b20d0d0
27ada1a
10edca9
b26bdcb
a1530c5
430c228
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,136 @@ | ||
| """ | ||
| Pure calculation functions for combining biological sequences and material blends. | ||
| No web-framework dependencies. | ||
| """ | ||
|
|
||
| from .dbconfig import DB_FILE | ||
| from .database import SLDDB | ||
| from .material import Material | ||
| from .element_table import get_element | ||
| from .comparators import ExactString | ||
|
|
||
| AMINO_ABRV = { | ||
| "A": "Alanine", | ||
| "R": "Arginine", | ||
| "N": "Asparagine", | ||
| "D": "Aspartate", | ||
| "B": "Aspartate", | ||
| "C": "Cysteine", | ||
| "E": "Glutamate", | ||
| "Q": "Glutamine", | ||
| "Z": "Glutamate", | ||
| "G": "Glycine", | ||
| "H": "Histidine", | ||
| "I": "Isoleucine", | ||
| "L": "Leucine", | ||
| "K": "Lysine", | ||
| "M": "Methionine", | ||
| "F": "Phenylalanine", | ||
| "P": "Proline", | ||
| "S": "Serine", | ||
| "T": "Threonine", | ||
| "W": "Tryptophan", | ||
| "Y": "Tyrosine", | ||
| "V": "Valine", | ||
| } | ||
|
|
||
| RNA_ABRV = { | ||
| "A": "RNA-Adenine", | ||
| "G": "RNA-Guanine", | ||
| "C": "RNA-Cytosine", | ||
| "U": "RNA-Uracil", | ||
| } | ||
|
|
||
| DNA_ABRV = { | ||
| "A": "DNA-Adenine", | ||
| "G": "DNA-Guanine", | ||
| "C": "DNA-Cytosine", | ||
| "T": "DNA-Thymine", | ||
| } | ||
|
|
||
|
|
||
| class SequenceParseError(ValueError): | ||
| pass | ||
|
|
||
|
|
||
| def clean_str(string): | ||
| return string.replace('\n', '').replace('\r', '').replace('\t', '').replace(' ', '').strip() | ||
|
|
||
|
|
||
| hx2o = Material([(get_element(element), amount) for element, amount in [('Hx', 2.0), ('O', 1.0)]], dens=1.0) | ||
|
|
||
|
|
||
| def collect_combination(ids, name_dict): | ||
| db = SLDDB(DB_FILE) | ||
| elements: list[Material] = [] | ||
| loaded_ids: dict[str, Material] = {} | ||
| for id in ids: | ||
| if id not in loaded_ids: | ||
| try: | ||
| entry = db.search_material(name=ExactString(name_dict[id]))[0] | ||
| except KeyError: | ||
| possible_ids = name_dict.keys() | ||
| raise SequenceParseError(f"Not a valid identifier {id}, options are {''.join(possible_ids)}") | ||
| except IndexError: | ||
| raise SequenceParseError(f"Molecule {name_dict[id]} not found in database") | ||
| m = db.select_material(entry) | ||
| loaded_ids[id] = m | ||
| elements.append(loaded_ids[id]) | ||
| result = elements[0] | ||
| for element in elements[1:]: | ||
| result += element | ||
| return result | ||
|
|
||
|
|
||
| def collect_protein(acids): | ||
| acids = clean_str(acids).upper() | ||
| result = collect_combination(acids, AMINO_ABRV) + hx2o | ||
| result.extra_data['description'] = f'protein - {len(acids)} residues' | ||
| return result | ||
|
|
||
|
|
||
| def collect_dna(bases): | ||
| bases = clean_str(bases).upper() | ||
| result = collect_combination(bases, DNA_ABRV) + hx2o | ||
| result.extra_data['description'] = f'DNA - {len(bases)} residues' | ||
| return result | ||
|
|
||
|
|
||
| def collect_rna(bases): | ||
| bases = clean_str(bases).upper() | ||
| result = collect_combination(bases, RNA_ABRV) + hx2o | ||
| result.extra_data['description'] = f'RNA - {len(bases)} residues' | ||
| return result | ||
|
|
||
|
|
||
| def collect_blendIDs(formula): | ||
| db = SLDDB(DB_FILE) | ||
| elements: list[Material] = [] | ||
| loaded_ids = {} | ||
| items = [] | ||
| while '(' in clean_str(formula): | ||
| pre, formula = formula.split(')', 1) | ||
| number = float(pre.split('*', 1)[0].strip('(').strip()) | ||
| ID = int(pre.split('*', 1)[1].strip()) | ||
| items.append((number, ID)) | ||
| for number, ID in items: | ||
| if ID not in loaded_ids: | ||
| entry = db.search_material(ID=ID)[0] | ||
| m = db.select_material(entry) | ||
| loaded_ids[ID] = m | ||
| elements.append(number * loaded_ids[ID]) | ||
| result = elements[0] | ||
| for element in elements[1:]: | ||
| result += element | ||
| return result | ||
|
|
||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd propose to do this simply by using a dictionary of collect functions and .get: COLLECT_FUNCTIONS = {
"protein": collect_protein,
"dna": collect_dna,
"rna": collect_rna,
"db": collect_blendIDs,
} |
||
| def collect_blend(mtype, idstr): | ||
| if mtype == 'protein': | ||
| return collect_protein(idstr) | ||
| elif mtype == 'dna': | ||
| return collect_dna(idstr) | ||
| elif mtype == 'rna': | ||
| return collect_rna(idstr) | ||
| elif mtype == 'db': | ||
| return collect_blendIDs(idstr) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,8 @@ | |
| from urllib.error import URLError | ||
|
|
||
| from . import DB_FILE, SLDDB | ||
| from .dbconfig import WEBAPI_URL | ||
| from .dbconfig import WEBAPI_URL, DB_MATERIALS_FIELDS, DB_MATERIALS_HIDDEN_DATA, db_lookup | ||
| from .blender import collect_protein, collect_dna, collect_rna | ||
| from .element_table import get_element | ||
| from .material import Formula, Material | ||
|
|
||
|
|
@@ -46,13 +47,13 @@ class SLD_API: | |
| max_age = 1 | ||
| db: SLDDB = None | ||
|
|
||
| def __init__(self): | ||
| self.first_access = True | ||
| self.use_webquery = True # only try webquery once, if error occurs switch to local database | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Making the default just a DB download will break the SLDdb statistical functions that are used to improve search results. (I.e. return most searched and most selected materials, first.) |
||
| def __init__(self, update_db=True): | ||
| self.update_db = update_db | ||
| self.use_webquery = False # default to using local database, which is updated regularly | ||
|
|
||
| def check(self): | ||
| # make sure the local database file is up to date, if not try to download newest version | ||
| if self.first_access: | ||
| if self.update_db: | ||
| now = datetime.datetime.now() | ||
| try: | ||
| stat = pathlib.Path(DB_FILE).stat() | ||
|
|
@@ -70,7 +71,7 @@ def check(self): | |
| except URLError as err: | ||
| warnings.warn("Can't download new version of database; " + str(err)) | ||
| self.db = SLDDB(DB_FILE) # after potential update, make connection with local database | ||
| self.first_access = False | ||
| self.update_db = False | ||
| else: | ||
| return | ||
|
|
||
|
|
@@ -95,11 +96,7 @@ def webquery(qdict): | |
| return json.loads(webdata.read()) # return decoded data | ||
|
|
||
| def localquery(self, qdict): | ||
| return self.db.search_material(**qdict) | ||
|
|
||
| def localmaterial(self, ID): | ||
| res = self.db.search_material(ID=ID) | ||
| return self.db.select_material(res[0]) | ||
| return query_api(qdict) | ||
|
|
||
| def search(self, **opts): | ||
| """ | ||
|
|
@@ -130,29 +127,22 @@ def material(self, ID): | |
| material=api.material(res[0]['ID']) | ||
| print(material.dens, material.rho_n, material.f_of_E(8.0)) | ||
| """ | ||
| if not self.use_webquery: | ||
| return self.localmaterial(ID) | ||
|
|
||
| self.check() | ||
| try: | ||
| res = self.webquery({"ID": int(ID)}) | ||
| except URLError: | ||
| self.use_webquery = False | ||
| return self.localmaterial(ID) | ||
| else: | ||
| f = Formula(res["formula"], sort=False) | ||
| mat_data = dict(dens=float(res["density"]), ID=ID, extra_data={}) | ||
| if res.get("name", None): | ||
| mat_data["name"] = res["name"] | ||
| if res.get("mu", 0.0): | ||
| mat_data["mu"] = res["mu"] | ||
| elif res.get("M", 0.0): | ||
| mat_data["M"] = res["M"] | ||
| for key in ["ORSO_validated", "description", "doi", "reference"]: | ||
| if key in res: | ||
| mat_data["extra_data"][key] = res[key] | ||
| out = Material([(get_element(element), amount) for element, amount in f], **mat_data) | ||
| return out | ||
| res = self.search(ID=int(ID)) | ||
|
|
||
| f = Formula(res["formula"], sort=False) | ||
| mat_data = dict(dens=float(res["density"]), ID=ID, extra_data={}) | ||
| if res.get("name", None): | ||
| mat_data["name"] = res["name"] | ||
| if res.get("mu", 0.0): | ||
| mat_data["mu"] = res["mu"] | ||
| elif res.get("M", 0.0): | ||
| mat_data["M"] = res["M"] | ||
| for key in ["ORSO_validated", "description", "doi", "reference"]: | ||
| if key in res: | ||
| mat_data["extra_data"][key] = res[key] | ||
| out = Material([(get_element(element), amount) for element, amount in f], **mat_data) | ||
| return out | ||
|
|
||
| @staticmethod | ||
| def custom(formula, dens=None, fu_volume=None, rho_n=None, mu=0.0, xsld=None, xE=None): | ||
|
|
@@ -180,7 +170,7 @@ def bio_blender(self, sequence, molecule="protein"): | |
| Get material for protein, DNA or RNA. Provide a letter sequence and molecule type ('protein', 'dna', 'rna'). | ||
| """ | ||
| opts = {molecule.lower(): sequence, "sldcalc": "true"} | ||
| res = self.webquery(opts) | ||
| res = self.search(**opts) | ||
| mat_data = dict(fu_volume=float(res["fu_volume"]), name=f"BioBlender-{molecule.lower()}", extra_data={}) | ||
| for key in [ | ||
| "description", | ||
|
|
@@ -190,3 +180,113 @@ def bio_blender(self, sequence, molecule="protein"): | |
|
|
||
| out = Material(Formula(res["formula"]), **mat_data) | ||
| return out | ||
|
|
||
|
|
||
| # webquery API functions: | ||
| def calc_api(args): | ||
| """Calculate SLD from formula/density or biological sequence. | ||
|
|
||
| args: dict-like with optional keys: formula, density, protein, dna, rna, | ||
| name, material_description, xray_unit. | ||
| Returns a JSON string. | ||
| """ | ||
| if 'protein' in args: | ||
| try: | ||
| material = collect_protein(args['protein']) | ||
| except Exception as e: | ||
| return repr(e) | ||
| else: | ||
| name = args.get('name', 'protein') | ||
| elif 'dna' in args: | ||
| try: | ||
| material = collect_dna(args['dna']) | ||
| except Exception as e: | ||
| return repr(e) | ||
| else: | ||
| name = args.get('name', 'DNA') | ||
| elif 'rna' in args: | ||
| try: | ||
| material = collect_rna(args['rna']) | ||
| except Exception as e: | ||
| return repr(e) | ||
| else: | ||
| name = args.get('name', 'RNA') | ||
| elif 'formula' in args and 'density' in args: | ||
| f = Formula(args['formula'], sort=False) | ||
| try: | ||
| material = Material(f, dens=float(args['density'])) | ||
| except Exception as e: | ||
| return repr(e) | ||
| else: | ||
| name = args.get('name', 'User Query') | ||
| else: | ||
| return 'Could not calculate, missing formula and density or protein/dna/rna sequence' | ||
| material.name = name | ||
| if args.get('material_description', '') != '': | ||
| material.extra_data['description'] = args['material_description'] | ||
| out = material.export(xray_units=args.get('xray_unit', 'edens')) | ||
| return out | ||
|
|
||
|
|
||
| def select_api(args): | ||
| """Return JSON for a material selected by ID. | ||
|
|
||
| args: dict-like with keys: ID, and optionally xray_unit. | ||
| Returns a JSON string. | ||
| """ | ||
| db = SLDDB(DB_FILE) | ||
| res = db.search_material(filter_invalid=False, ID=int(args['ID'])) | ||
| try: | ||
| material = db.select_material(res[0]) | ||
| except IndexError: | ||
| return '## ID not found in database' | ||
| except Exception as e: | ||
| return repr(e) + '<br >' + "Raised when tried to parse material = %s" % res[0] | ||
| out = material.export(xray_units=args.get('xray_unit', 'edens')) | ||
| return out | ||
|
|
||
|
|
||
| def search_api(args): | ||
| """Search the database with the given field values. | ||
|
|
||
| args: dict-like mapping DB field names to query values. | ||
| Returns a JSON string. | ||
| """ | ||
| query = {} | ||
| for key, value in args.items(): | ||
| if str(value).strip() == '': | ||
| continue | ||
| if key in DB_MATERIALS_FIELDS: | ||
| try: | ||
| query[key] = db_lookup[key][1].convert(str(value)) | ||
| except Exception as e: | ||
| return repr(e) + '<br >' + "Raised when tried to parse %s = %s" % (key, value) | ||
| db = SLDDB(DB_FILE) | ||
| res = db.search_material(serializable=True, limit=10000, **query) | ||
|
|
||
| # remove hidden database fields besides ORSO validation | ||
| for ri in res: | ||
| for field in DB_MATERIALS_HIDDEN_DATA: | ||
| if field.startswith('validated'): | ||
| continue | ||
| del ri[field] | ||
|
|
||
| return res | ||
|
|
||
|
|
||
| def query_api(args): | ||
| """Dispatch an API request based on which keys are present in args. | ||
|
|
||
| args: dict-like (e.g. request.args or a plain dict). | ||
| Returns a JSON string. | ||
| """ | ||
| if 'ID' in args: | ||
| return select_api(args) | ||
| elif 'sldcalc' in args: | ||
| return calc_api(args) | ||
| elif 'get_fields' in args: | ||
| return [ | ||
| field for field in DB_MATERIALS_FIELDS if field not in DB_MATERIALS_HIDDEN_DATA | ||
| ] | ||
| else: | ||
| return search_api(args) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this be simply
return sum(elements)?