diff --git a/simulationworkflowschema/equation_of_state.py b/simulationworkflowschema/equation_of_state.py index edda544..8f6b15e 100644 --- a/simulationworkflowschema/equation_of_state.py +++ b/simulationworkflowschema/equation_of_state.py @@ -21,7 +21,7 @@ from nomad.atomutils import get_volume from nomad.datamodel.data import ArchiveSection from nomad.units import ureg -from nomad.metainfo import SubSection, Section, Quantity +from nomad.metainfo import SubSection, Section, Quantity, MProxy from nomad.datamodel.metainfo.workflow import Link from .general import ( SimulationWorkflowMethod, @@ -30,6 +30,9 @@ WORKFLOW_METHOD_NAME, WORKFLOW_RESULTS_NAME, ) +from .single_point import SinglePoint +from runschema.run import Run, Program +from runschema.system import System class EquationOfStateMethod(SimulationWorkflowMethod): @@ -147,9 +150,110 @@ class EquationOfState(ParallelSimulation): results = SubSection(sub_section=EquationOfStateResults) + # ! For default path code in normalize + # def __init__(self, *args, **kwargs): + # super().__init__(*args, **kwargs) + # self.default_archive_paths = { + # 'input': 'run/0/system/-1', + # 'task': 'workflow2', + # } + + # def get_default_archive_path(self, raw_proxy_value, section_type='') -> str: + # """ + # Returns a certain archive path if the raw proxy value points to the root of the archive. + # """ + # if raw_proxy_value is None: + # return '' + + # if '#/' in raw_proxy_value: + # _, after = raw_proxy_value.split('#/', 1) + # if after: + # return '' + # else: + # return self.default_archive_paths.get(section_type, '') + # else: + # return '' + + def get_section_global_path(self, section: ArchiveSection) -> str: + """ + Returns the global path of a section in the archive. + """ + if isinstance(section, MProxy): + return section.m_proxy_value + else: + archive_root = section.m_root() + archive_metadata = ( + archive_root.metadata if archive_root is not None else None + ) + if not archive_metadata: + return None + + entry_id = archive_metadata.entry_id + path = section.m_path() + return f'../upload/archive/{entry_id}#{path}' if entry_id and path else None + def normalize(self, archive, logger): super().normalize(archive, logger) + flag_input_structure = False + input_path_global = '' + input_archive_root = None + # find the input structure + if self.inputs: + for input_item in self.inputs: + # if isinstance(input_item.section, MProxy): + # input_path_global = input_item.section.m_proxy_value + input_path_global = self.get_section_global_path(input_item.section) + input_section = input_item.section.m_resolved() + + # ! For replacing short-hand sections with standardized paths + # default_path = self.get_default_archive_path( + # input_path_global, section_type='input' + # ) + # logger.warning(f'default_path: {default_path}') + # if default_path != '': + # archive_root = archive.m_context.resolve_archive(input_path_global) + # input_section = archive_root.m_resolve(default_path) + if not isinstance(input_section, System): + continue + + flag_input_structure = True + # ! No longer needed + # archive_root = ( + # archive.m_context.resolve_archive(input_path_global) + # if input_path_global + # else archive_root + # ) + # ! Goes with default path code above + # input_proxy_value = input_path_global + default_path + system_index = input_section.m_parent_index + run_section = input_section.m_parent + run_index = run_section.m_parent_index + input_name = input_item.name + if input_archive_root: + if system_index == -1: + system_index = len(input_archive_root.run[run_index].system) - 1 + if not archive.run: + run = Run(program=Program()) + try: + run.system.extend([input_section]) + run.method.extend(input_archive_root.run[run_index].method) + for calc in input_archive_root.run[run_index].calculation: + if calc.system_ref.m_parent_index == system_index: + run.calculation.extend([calc]) + break + except Exception: + logger.warning( + 'Failed to create run section from input structure. ' + ) + + archive.run.append(run) + + break + + if not flag_input_structure: + logger.warning('No input structure found in EOS workflow normalizer.') + if not self.method: self.method = EquationOfStateMethod() self.inputs.append(Link(name=WORKFLOW_METHOD_NAME, section=self.method)) @@ -158,9 +262,65 @@ def normalize(self, archive, logger): self.results = EquationOfStateResults() self.outputs.append(Link(name=WORKFLOW_RESULTS_NAME, section=self.results)) - if not self._calculations: + #! Causing test to fail + try: + task_archives = [task.task.m_root() for task in self.tasks] + assert all( + isinstance(task_archive.workflow2, SinglePoint) + for task_archive in task_archives + ) + except Exception: + logger.warning( + 'Not all tasks are SinglePoints or failed to retrieve task archives. EOS workflow may be incomplete or incorrect.' + ) return + for task in self.tasks: + # TODO - I need an alternative method to get the full input section path + # ! m_proxy_value is not available for "noraml sections" + # logger.warning(f'task: {task.task}') + # logger.warning(f'task.section: {task.task.section}') + # raw_proxy_value = task.section.m_proxy_value + # default_path = self.get_default_archive_path( + # raw_proxy_value, section_type='task' + # ) + # if default_path: + # # replace the task section with the default for tasks + # # task.section = task.section.m_xpath(default_path) + # archive_root = archive.m_context.resolve_archive(raw_proxy_value) + # task.section = archive_root.m_resolve(default_path) + + # TODO - Add global output to each task output? + + if input_path_global: + # input_proxy_values = [ + # input.section.m_proxy_value for input in task.inputs + # ] + input_proxy_values = [ + self.get_section_global_path(input.section) for input in task.inputs + ] + if input_path_global in input_proxy_values: + index = input_proxy_values.index(input_path_global) + task.inputs[index].name = input_name + else: + task.inputs.append(Link(name=input_name, section=input_section)) + + if not self._calculations: + # try to get calculations from tasks (in case of instantiation from workflow yaml) + try: + self._calculations = [ + task.task.results.calculations_ref[0] for task in self.tasks + ] + except Exception: + pass + + if not self._systems: + # try to get systems from calculations (in case of instantiation from workflow yaml) + try: + self._systems = [calc.system_ref for calc in self._calculations] + except Exception: + pass + if self.results.energies is None: try: self.results.energies = [ @@ -210,4 +370,25 @@ def normalize(self, archive, logger): ) self.results.eos_fit.append(eos_fit) except Exception: - self.logger.warning('EOS fit not succesful.') + logger.warning('EOS fit not succesful.') + + # @staticmethod + # def archive_path_to_jmespath(path: str) -> str: + # """ + # Converts an archive path like 'run/0/system/-1' to a jmespath like 'run[0].system[-1]'. + # """ + # if not path: + # return '' + # parts = path.strip('/').split('/') + # jmes = [] + # i = 0 + # while i < len(parts): + # part = parts[i] + # # If next part is an integer, treat as index + # if i + 1 < len(parts) and parts[i + 1].lstrip('-').isdigit(): + # jmes.append(f"{part}[{parts[i + 1]}]") + # i += 2 + # else: + # jmes.append(part) + # i += 1 + # return '.'.join(jmes) diff --git a/tests/test_simulationworkflowschema.py b/tests/test_simulationworkflowschema.py index cb6005a..d170d5a 100644 --- a/tests/test_simulationworkflowschema.py +++ b/tests/test_simulationworkflowschema.py @@ -735,6 +735,8 @@ def parse_trajectory(filename): return archive +# ! This parser seems to has very strange behavior, none of the basic m_xxx +# ! attributes are defined for the section refs of the workflow def test_eos_workflow(): archive = parse_trajectory('tests/data/ase/Cu.traj')