-
Notifications
You must be signed in to change notification settings - Fork 1
connect to calculations and systems through tasks #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
7b86083
0e91b8e
57e491f
657d680
c27fd1d
1f6d761
88bca74
9ea8ebd
0edcc01
17fc14c
cb3dc2b
d9a05c6
085ef61
c8cf76d
868f62b
cb233fa
393f26f
cee83c5
f840c0f
7c7dde6
3771792
28a1b0b
a6d126f
37a6af1
bc0d77f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,7 @@ | |
| from nomad.atomutils import get_volume | ||
| from nomad.datamodel.data import ArchiveSection | ||
| from nomad.units import ureg | ||
| from nomad.metainfo import SubSection, Section, Quantity | ||
| from nomad.metainfo import SubSection, Section, Quantity, MProxy | ||
| from nomad.datamodel.metainfo.workflow import Link | ||
| from .general import ( | ||
| SimulationWorkflowMethod, | ||
|
|
@@ -30,6 +30,9 @@ | |
| WORKFLOW_METHOD_NAME, | ||
| WORKFLOW_RESULTS_NAME, | ||
| ) | ||
| from .single_point import SinglePoint | ||
| from runschema.run import Run, Program | ||
| from runschema.system import System | ||
|
|
||
|
|
||
| class EquationOfStateMethod(SimulationWorkflowMethod): | ||
|
|
@@ -147,9 +150,110 @@ class EquationOfState(ParallelSimulation): | |
|
|
||
| results = SubSection(sub_section=EquationOfStateResults) | ||
|
|
||
| # ! For default path code in normalize | ||
| # def __init__(self, *args, **kwargs): | ||
| # super().__init__(*args, **kwargs) | ||
| # self.default_archive_paths = { | ||
| # 'input': 'run/0/system/-1', | ||
| # 'task': 'workflow2', | ||
| # } | ||
|
|
||
| # def get_default_archive_path(self, raw_proxy_value, section_type='') -> str: | ||
| # """ | ||
| # Returns a certain archive path if the raw proxy value points to the root of the archive. | ||
| # """ | ||
| # if raw_proxy_value is None: | ||
| # return '' | ||
|
|
||
| # if '#/' in raw_proxy_value: | ||
| # _, after = raw_proxy_value.split('#/', 1) | ||
| # if after: | ||
| # return '' | ||
| # else: | ||
| # return self.default_archive_paths.get(section_type, '') | ||
| # else: | ||
| # return '' | ||
|
|
||
| def get_section_global_path(self, section: ArchiveSection) -> str: | ||
| """ | ||
| Returns the global path of a section in the archive. | ||
| """ | ||
| if isinstance(section, MProxy): | ||
| return section.m_proxy_value | ||
| else: | ||
| archive_root = section.m_root() | ||
| archive_metadata = ( | ||
| archive_root.metadata if archive_root is not None else None | ||
| ) | ||
| if not archive_metadata: | ||
| return None | ||
|
|
||
| entry_id = archive_metadata.entry_id | ||
| path = section.m_path() | ||
| return f'../upload/archive/{entry_id}#{path}' if entry_id and path else None | ||
|
|
||
| def normalize(self, archive, logger): | ||
| super().normalize(archive, logger) | ||
|
|
||
| flag_input_structure = False | ||
| input_path_global = '' | ||
| input_archive_root = None | ||
| # find the input structure | ||
| if self.inputs: | ||
| for input_item in self.inputs: | ||
| # if isinstance(input_item.section, MProxy): | ||
| # input_path_global = input_item.section.m_proxy_value | ||
| input_path_global = self.get_section_global_path(input_item.section) | ||
| input_section = input_item.section.m_resolved() | ||
|
|
||
| # ! For replacing short-hand sections with standardized paths | ||
| # default_path = self.get_default_archive_path( | ||
| # input_path_global, section_type='input' | ||
| # ) | ||
| # logger.warning(f'default_path: {default_path}') | ||
| # if default_path != '': | ||
| # archive_root = archive.m_context.resolve_archive(input_path_global) | ||
| # input_section = archive_root.m_resolve(default_path) | ||
| if not isinstance(input_section, System): | ||
| continue | ||
|
|
||
| flag_input_structure = True | ||
| # ! No longer needed | ||
| # archive_root = ( | ||
| # archive.m_context.resolve_archive(input_path_global) | ||
| # if input_path_global | ||
| # else archive_root | ||
| # ) | ||
| # ! Goes with default path code above | ||
| # input_proxy_value = input_path_global + default_path | ||
| system_index = input_section.m_parent_index | ||
| run_section = input_section.m_parent | ||
| run_index = run_section.m_parent_index | ||
| input_name = input_item.name | ||
| if input_archive_root: | ||
| if system_index == -1: | ||
| system_index = len(input_archive_root.run[run_index].system) - 1 | ||
| if not archive.run: | ||
| run = Run(program=Program()) | ||
| try: | ||
| run.system.extend([input_section]) | ||
| run.method.extend(input_archive_root.run[run_index].method) | ||
| for calc in input_archive_root.run[run_index].calculation: | ||
| if calc.system_ref.m_parent_index == system_index: | ||
| run.calculation.extend([calc]) | ||
| break | ||
| except Exception: | ||
| logger.warning( | ||
| 'Failed to create run section from input structure. ' | ||
| ) | ||
|
|
||
| archive.run.append(run) | ||
|
|
||
| break | ||
|
|
||
| if not flag_input_structure: | ||
| logger.warning('No input structure found in EOS workflow normalizer.') | ||
|
|
||
| if not self.method: | ||
| self.method = EquationOfStateMethod() | ||
| self.inputs.append(Link(name=WORKFLOW_METHOD_NAME, section=self.method)) | ||
|
|
@@ -158,9 +262,65 @@ def normalize(self, archive, logger): | |
| self.results = EquationOfStateResults() | ||
| self.outputs.append(Link(name=WORKFLOW_RESULTS_NAME, section=self.results)) | ||
|
|
||
| if not self._calculations: | ||
| #! Causing test to fail | ||
| try: | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've adjusted the code now so that the test passes without the code within this try statement. When I include this check for single point workflows I get: ERROR nomad.normalizing:metainfo.py:39 {"event": "could not normalize section", "exception": "Traceback (most recent call last):\n File "/home/jfrudzinski/work/soft/nomad-distro-dev-run-schema-2025-04/packages/nomad-FAIR/nomad/normalizing/metainfo.py", line 37, in normalize_section\n normalize(archive, logger)\n File "/home/jfrudzinski/work/soft/nomad-distro-dev-run-schema-2025-04/packages/nomad-FAIR/nomad/datamodel/datamodel.py", line 1233, in normalize\n if not archive.metadata.entry_type:\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^\nAttributeError: 'NoneType' object has no attribute 'entry_type'", "normalizer": "MetainfoNormalizer", "section": "EntryArchive", "timestamp": "2025-06-09 16:37.36"}
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i thought i already fixed this months ago. i put a returm if metadata is none maybe it was reverted accindentally i will fix it.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🙏 🙏 |
||
| task_archives = [task.task.m_root() for task in self.tasks] | ||
| assert all( | ||
| isinstance(task_archive.workflow2, SinglePoint) | ||
| for task_archive in task_archives | ||
| ) | ||
| except Exception: | ||
| logger.warning( | ||
| 'Not all tasks are SinglePoints or failed to retrieve task archives. EOS workflow may be incomplete or incorrect.' | ||
| ) | ||
|
JFRudzinski marked this conversation as resolved.
|
||
| return | ||
|
|
||
| for task in self.tasks: | ||
| # TODO - I need an alternative method to get the full input section path | ||
| # ! m_proxy_value is not available for "noraml sections" | ||
| # logger.warning(f'task: {task.task}') | ||
| # logger.warning(f'task.section: {task.task.section}') | ||
| # raw_proxy_value = task.section.m_proxy_value | ||
| # default_path = self.get_default_archive_path( | ||
| # raw_proxy_value, section_type='task' | ||
| # ) | ||
| # if default_path: | ||
| # # replace the task section with the default for tasks | ||
| # # task.section = task.section.m_xpath(default_path) | ||
| # archive_root = archive.m_context.resolve_archive(raw_proxy_value) | ||
| # task.section = archive_root.m_resolve(default_path) | ||
|
|
||
| # TODO - Add global output to each task output? | ||
|
|
||
| if input_path_global: | ||
| # input_proxy_values = [ | ||
| # input.section.m_proxy_value for input in task.inputs | ||
| # ] | ||
| input_proxy_values = [ | ||
| self.get_section_global_path(input.section) for input in task.inputs | ||
| ] | ||
| if input_path_global in input_proxy_values: | ||
| index = input_proxy_values.index(input_path_global) | ||
| task.inputs[index].name = input_name | ||
| else: | ||
| task.inputs.append(Link(name=input_name, section=input_section)) | ||
|
|
||
| if not self._calculations: | ||
| # try to get calculations from tasks (in case of instantiation from workflow yaml) | ||
| try: | ||
| self._calculations = [ | ||
| task.task.results.calculations_ref[0] for task in self.tasks | ||
| ] | ||
| except Exception: | ||
| pass | ||
|
|
||
| if not self._systems: | ||
| # try to get systems from calculations (in case of instantiation from workflow yaml) | ||
| try: | ||
| self._systems = [calc.system_ref for calc in self._calculations] | ||
| except Exception: | ||
| pass | ||
|
|
||
| if self.results.energies is None: | ||
| try: | ||
| self.results.energies = [ | ||
|
|
@@ -210,4 +370,25 @@ def normalize(self, archive, logger): | |
| ) | ||
| self.results.eos_fit.append(eos_fit) | ||
| except Exception: | ||
| self.logger.warning('EOS fit not succesful.') | ||
| logger.warning('EOS fit not succesful.') | ||
|
|
||
| # @staticmethod | ||
| # def archive_path_to_jmespath(path: str) -> str: | ||
| # """ | ||
| # Converts an archive path like 'run/0/system/-1' to a jmespath like 'run[0].system[-1]'. | ||
| # """ | ||
| # if not path: | ||
| # return '' | ||
| # parts = path.strip('/').split('/') | ||
| # jmes = [] | ||
| # i = 0 | ||
| # while i < len(parts): | ||
| # part = parts[i] | ||
| # # If next part is an integer, treat as index | ||
| # if i + 1 < len(parts) and parts[i + 1].lstrip('-').isdigit(): | ||
| # jmes.append(f"{part}[{parts[i + 1]}]") | ||
| # i += 2 | ||
| # else: | ||
| # jmes.append(part) | ||
| # i += 1 | ||
| # return '.'.join(jmes) | ||
Uh oh!
There was an error while loading. Please reload this page.