From 1c4eea1ffe4e273f58973d2dd0b09c89014eaca1 Mon Sep 17 00:00:00 2001 From: Martin Spetlik Date: Sat, 3 Dec 2022 16:01:31 +0100 Subject: [PATCH 01/17] fullscale transport sim --- mlmc/tool/fullscale_transport_sim.py | 83 ++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 mlmc/tool/fullscale_transport_sim.py diff --git a/mlmc/tool/fullscale_transport_sim.py b/mlmc/tool/fullscale_transport_sim.py new file mode 100644 index 0000000..4ca7e43 --- /dev/null +++ b/mlmc/tool/fullscale_transport_sim.py @@ -0,0 +1,83 @@ +import os +import copy +import shutil +import numpy as np +from typing import * + +import mlmc.random.correlated_field as cf +from typing import List +from mlmc.sim.simulation import Simulation +from mlmc.quantity.quantity_spec import QuantitySpec +from mlmc.level_simulation import LevelSimulation + + +class FullScaleTransportSim(Simulation): + + def __init__(self, config): + """ + :param config: Dict, simulation configuration + """ + #super().__init__() + self._config = config + + def level_instance(self, fine_level_params: List[float], coarse_level_params: List[float]) -> LevelSimulation: + """ + Called from mlmc.Sampler, it creates single instance of LevelSimulation (mlmc.level_simulation) + :param fine_level_params: fine simulation step at particular level + :param coarse_level_params: coarse simulation step at particular level + :return: mlmc.LevelSimulation object + """ + config = copy.deepcopy(self._config) + # Set sample specific parameters + # config["fine"] = {} + # config["coarse"] = {} + # config["fine"]["n_steps"] = fine_level_params[0] + # config["coarse"]["n_steps"] = coarse_level_params[0] + # config["res_format"] = self.result_format() + + return LevelSimulation(config_dict=config, + calculate=FullScaleTransportSim.calculate, + task_size=config["mesh_steps"][fine_level_params[0]], # @TODO: set size + need_sample_workspace=True) + + @staticmethod + def calculate(config, seed): + """ + Calculate fine and coarse sample and also extract their results + :param config: general configuration + :param seed: random number generator seed + :return: np.ndarray, np.ndarray + """ + from endorse.fullscale_transport import fullscale_transport + + from endorse import common + from endorse.common import dotdict, memoize, File, call_flow, workdir, report + from endorse.mesh_class import Mesh + + ################### + ### fine sample ### + ################### + conf_file = os.path.join(config["work_dir"], "test_data/config_homogenisation.yaml") + cfg = common.load_config(conf_file) + cfg.flow_env["flow_executable"] = config["flow_executable"] + cfg["work_dir"] = config["work_dir"] + + source_params = dict(position=10, length=6) + fo = fullscale_transport(cfg, source_params, seed) + fine_res = fo.hydro + + ##################### + ### coarse sample ### + ##################### + coarse_res = 0 + + return fine_res, coarse_res + + def result_format(self) -> List[QuantitySpec]: + """ + Result format + :return: + """ + spec1 = QuantitySpec(name="conductivity", unit="m", shape=(1, 1), times=[1], locations=['0']) + # spec2 = QuantitySpec(name="width", unit="mm", shape=(2, 1), times=[1, 2, 3], locations=['30', '40']) + return [spec1] From f2b5a21ba515dac38f0ffb79e9a053110a57bcdb Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Sun, 4 Dec 2022 16:46:22 +0100 Subject: [PATCH 02/17] merge with changes in JB_full_transport --- mlmc/{tool => sim}/fullscale_transport_sim.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) rename mlmc/{tool => sim}/fullscale_transport_sim.py (92%) diff --git a/mlmc/tool/fullscale_transport_sim.py b/mlmc/sim/fullscale_transport_sim.py similarity index 92% rename from mlmc/tool/fullscale_transport_sim.py rename to mlmc/sim/fullscale_transport_sim.py index 4ca7e43..4712fb2 100644 --- a/mlmc/tool/fullscale_transport_sim.py +++ b/mlmc/sim/fullscale_transport_sim.py @@ -48,6 +48,7 @@ def calculate(config, seed): :param seed: random number generator seed :return: np.ndarray, np.ndarray """ + from endorse.fullscale_transport import fullscale_transport from endorse import common @@ -57,12 +58,16 @@ def calculate(config, seed): ################### ### fine sample ### ################### - conf_file = os.path.join(config["work_dir"], "test_data/config_homogenisation.yaml") + conf_file = os.path.join(config["work_dir"], "test_data/config_homo_tsx.yaml") cfg = common.load_config(conf_file) cfg.flow_env["flow_executable"] = config["flow_executable"] cfg["work_dir"] = config["work_dir"] source_params = dict(position=10, length=6) + cfg_fine = cfg.transport_fullscale + shutil.copy(os.path.join(cfg["work_dir"], cfg_fine.piezo_head_input_file), os.getcwd()) + shutil.copy(os.path.join(cfg["work_dir"], cfg_fine.conc_flux_file), os.getcwd()) + fo = fullscale_transport(cfg, source_params, seed) fine_res = fo.hydro From fec6dcf2588be9c2b041c11ae2ad907b995233f7 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Mon, 5 Dec 2022 09:19:47 +0100 Subject: [PATCH 03/17] update sklearn dependency --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 765d315..f9301bd 100644 --- a/setup.py +++ b/setup.py @@ -61,5 +61,5 @@ def read(*names, **kwargs): # include automatically all files in the template MANIFEST.in include_package_data=True, zip_safe=False, - install_requires=['numpy', 'scipy', 'sklearn', 'h5py>=3.1.0', 'ruamel.yaml', 'attrs', 'gstools', 'memoization'], + install_requires=['numpy', 'scipy', 'scikit-learn', 'h5py>=3.1.0', 'ruamel.yaml', 'attrs', 'gstools', 'memoization'], ) From 2bea0ad243c7e7b57b5119e3d1c10a96464a88b6 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Mon, 5 Dec 2022 09:20:19 +0100 Subject: [PATCH 04/17] simplify passing of large files --- mlmc/sim/fullscale_transport_sim.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/mlmc/sim/fullscale_transport_sim.py b/mlmc/sim/fullscale_transport_sim.py index 4712fb2..50bd542 100644 --- a/mlmc/sim/fullscale_transport_sim.py +++ b/mlmc/sim/fullscale_transport_sim.py @@ -58,17 +58,13 @@ def calculate(config, seed): ################### ### fine sample ### ################### - conf_file = os.path.join(config["work_dir"], "test_data/config_homo_tsx.yaml") - cfg = common.load_config(conf_file) - cfg.flow_env["flow_executable"] = config["flow_executable"] - cfg["work_dir"] = config["work_dir"] - source_params = dict(position=10, length=6) - cfg_fine = cfg.transport_fullscale - shutil.copy(os.path.join(cfg["work_dir"], cfg_fine.piezo_head_input_file), os.getcwd()) - shutil.copy(os.path.join(cfg["work_dir"], cfg_fine.conc_flux_file), os.getcwd()) + #conf_file = os.path.join(config["work_dir"], "test_data/config_homo_tsx.yaml") + #cfg = common.load_config(conf_file) + #cfg.flow_env["flow_executable"] = config["flow_executable"] + #cfg["work_dir"] = config["work_dir"] - fo = fullscale_transport(cfg, source_params, seed) + fo = fullscale_transport(config['main_cfg_file'], config['source_params'], seed) fine_res = fo.hydro ##################### From 618da8a92116adbaac7e690be59afb4e514dd508 Mon Sep 17 00:00:00 2001 From: Martin Spetlik Date: Fri, 9 Dec 2022 19:07:34 +0100 Subject: [PATCH 05/17] pbs commands --- mlmc/sampling_pool_pbs.py | 36 ++++++++++++++++++---------- mlmc/tool/pbs_commands.py | 50 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 12 deletions(-) create mode 100644 mlmc/tool/pbs_commands.py diff --git a/mlmc/sampling_pool_pbs.py b/mlmc/sampling_pool_pbs.py index 01c4128..677d7f4 100644 --- a/mlmc/sampling_pool_pbs.py +++ b/mlmc/sampling_pool_pbs.py @@ -8,6 +8,7 @@ from mlmc.level_simulation import LevelSimulation from mlmc.sampling_pool import SamplingPool from mlmc.tool.pbs_job import PbsJob +from mlmc.tool.pbs_commands import PbsCommands """ SamplingPoolPBS description @@ -90,6 +91,17 @@ def __init__(self, work_dir, debug=False): self._qsub_failed_n = 0 self._qstat_failed_n = 0 # Number of failed execution of commands qsub, qstat + self._pbs_commands = None + + @property + def pbs_commands(self): + if self._pbs_commands is None: + self._pbs_commands = PbsCommands() + return self._pbs_commands + + @pbs_commands.setter + def pbs_commands(self, pbs_commands_obj): + self._pbs_commands = pbs_commands_obj def _get_job_count(self): """ @@ -220,18 +232,18 @@ def execute(self): job_file = os.path.join(self._jobs_dir, SamplingPoolPBS.JOB.format(job_id)) script_content = "\n".join(self.pbs_script) self.write_script(script_content, job_file) + process = self.pbs_commands.qsub([job_file]) - process = subprocess.run(['qsub', job_file], stderr=subprocess.PIPE, stdout=subprocess.PIPE) try: - if process.returncode != 0: - raise Exception(process.stderr.decode('ascii')) + if process.status != 0: + raise Exception(process.stderr) # Find all finished jobs self._qsub_failed_n = 0 # Write current job count self._job_count += 1 # Get pbs_id from qsub output - pbs_id = process.stdout.decode("ascii").split(".")[0] + pbs_id = process.stdout.split(".")[0] # Store pbs id for future qstat calls self._pbs_ids.append(pbs_id) pbs_process.write_pbs_id(pbs_id) @@ -242,7 +254,7 @@ def execute(self): except: self._qsub_failed_n += 1 if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N: - raise Exception(process.stderr.decode("ascii")) + raise Exception(process.stderr) def _create_script(self): """ @@ -286,22 +298,22 @@ def _qstat_pbs_job(self): if len(self._pbs_ids) > 0: # Get PBS id's status, # '-x' - displays status information for finished and moved jobs in addition to queued and running jobs. - qstat_call = ["qstat", "-x"] - qstat_call.extend(self._pbs_ids) + qstat_args = ["-x"] + qstat_args.extend(self._pbs_ids) # qstat call - process = subprocess.run(qstat_call, stderr=subprocess.PIPE, stdout=subprocess.PIPE) + process = self.pbs_commands.qstat(qstat_args) try: - if process.returncode != 0: - raise Exception(process.stderr.decode("ascii")) - output = process.stdout.decode("ascii") + if process.status != 0: + raise Exception(process.stderr) + output = process.stdout # Find all finished jobs finished_pbs_jobs = re.findall(r"(\d+)\..*\d+ F", output) self._qstat_failed_n = 0 except: self._qstat_failed_n += 1 if self._qstat_failed_n > SamplingPoolPBS.QSTAT_FAILED_MAX_N: - raise Exception(process.stderr.decode("ascii")) + raise Exception(process.stderr) finished_pbs_jobs = [] # Get unfinished as diff between planned and finished diff --git a/mlmc/tool/pbs_commands.py b/mlmc/tool/pbs_commands.py new file mode 100644 index 0000000..550f02a --- /dev/null +++ b/mlmc/tool/pbs_commands.py @@ -0,0 +1,50 @@ +import subprocess +import attr +from abc import ABC, abstractmethod +from mlmc.sampling_pool import SamplingPool + + +class PbsCommandsAbstract(ABC): + + @abstractmethod + def qsub(self, args): + """ + :return: CommandOutput instance + """ + + @abstractmethod + def qstat(self, args): + """ + :return: CommandOutput instance + """ + + +class PbsCommands(PbsCommandsAbstract): + + def qsub(self, args): + process = self._run_command("qsub", args) + + return CommandOutput(status=process.returncode, + stdout=process.stdout.decode("ascii"), + stderr=process.stderr.decode("ascii")) + + def qstat(self, args): + process = self._run_command("qstat", args) + + return CommandOutput(status=process.returncode, + stdout=process.stdout.decode("ascii"), + stderr=process.stderr.decode("ascii")) + + @staticmethod + def _run_command(command, args): + return subprocess.run([command, *args], stderr=subprocess.PIPE, stdout=subprocess.PIPE) + + + +@attr.s(auto_attribs=True) +class CommandOutput: + status: int + stdout: str = None + stderr: str = None + + From ce5f8c1c6ffcee4debbe514dd0fe8ff4a0be8483 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Thu, 29 Dec 2022 23:13:03 +0100 Subject: [PATCH 06/17] replace unsupported np.float --- mlmc/sample_storage_hdf.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mlmc/sample_storage_hdf.py b/mlmc/sample_storage_hdf.py index 7e5fbef..2fa70c0 100644 --- a/mlmc/sample_storage_hdf.py +++ b/mlmc/sample_storage_hdf.py @@ -39,7 +39,7 @@ def _hdf_result_format(self, locations, times): :return: """ if len(locations[0]) == 3: - tuple_dtype = np.dtype((np.float, (3,))) + tuple_dtype = np.dtype((float, (3,))) loc_dtype = np.dtype((tuple_dtype, (len(locations),))) else: loc_dtype = np.dtype(('S50', (len(locations),))) @@ -48,14 +48,14 @@ def _hdf_result_format(self, locations, times): 'formats': ('S50', 'S50', np.dtype((np.int32, (2,))), - np.dtype((np.float, (len(times),))), + np.dtype((float, (len(times),))), loc_dtype ) } return result_dtype - def save_global_data(self, level_parameters: List[np.float], result_format: List[QuantitySpec]): + def save_global_data(self, level_parameters: List[float], result_format: List[QuantitySpec]): """ Save hdf5 file global attributes :param level_parameters: list of simulation steps From e2ff85616eb293d269348b1a3ba005f1f8c29426 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Fri, 30 Dec 2022 00:26:25 +0100 Subject: [PATCH 07/17] fix simulation result --- mlmc/sim/fullscale_transport_sim.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/mlmc/sim/fullscale_transport_sim.py b/mlmc/sim/fullscale_transport_sim.py index 50bd542..8404072 100644 --- a/mlmc/sim/fullscale_transport_sim.py +++ b/mlmc/sim/fullscale_transport_sim.py @@ -64,21 +64,25 @@ def calculate(config, seed): #cfg.flow_env["flow_executable"] = config["flow_executable"] #cfg["work_dir"] = config["work_dir"] - fo = fullscale_transport(config['main_cfg_file'], config['source_params'], seed) - fine_res = fo.hydro + val = fullscale_transport(config['main_cfg_file'], config['source_params'], seed) + q10 = list(val) + add_values = (10 - len(q10)) * [0.0] + q10.extend(add_values) #fixed_indicators[:len(ind_time_max)] = np.array(ind_time_max) + res_fine = np.asarray(q10) + #fine_res = fo.hydro ##################### ### coarse sample ### ##################### - coarse_res = 0 + res_coarse = np.zeros_like(res_fine) - return fine_res, coarse_res + return res_fine, res_coarse def result_format(self) -> List[QuantitySpec]: """ Result format :return: """ - spec1 = QuantitySpec(name="conductivity", unit="m", shape=(1, 1), times=[1], locations=['0']) + spec1 = QuantitySpec(name="indicator_conc", unit="g/m3", shape=(10, 1), times=[1], locations=['0']) # spec2 = QuantitySpec(name="width", unit="mm", shape=(2, 1), times=[1, 2, 3], locations=['30', '40']) return [spec1] From ee0f04038312511645f0a8a252210f1c8d1747be Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Fri, 30 Dec 2022 15:03:20 +0100 Subject: [PATCH 08/17] Eradicate np.float --- mlmc/sample_storage.py | 2 +- mlmc/tool/hdf5.py | 2 +- test/test_hdf.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mlmc/sample_storage.py b/mlmc/sample_storage.py index 01be908..784637f 100644 --- a/mlmc/sample_storage.py +++ b/mlmc/sample_storage.py @@ -171,7 +171,7 @@ def _save_successful(self, samples): res = np.array(res) fine_coarse_res = res[:, 1] - result_type = np.dtype((np.float, np.array(fine_coarse_res[0]).shape)) + result_type = np.dtype((float, np.array(fine_coarse_res[0]).shape)) results = np.empty(shape=(len(res),), dtype=result_type) results[:] = [val for val in fine_coarse_res] diff --git a/mlmc/tool/hdf5.py b/mlmc/tool/hdf5.py index f6f3219..32e2414 100644 --- a/mlmc/tool/hdf5.py +++ b/mlmc/tool/hdf5.py @@ -309,7 +309,7 @@ def append_successful(self, samples: np.array): self._append_dataset(self.collected_ids_dset, samples[:, 0]) values = samples[:, 1] - result_type = np.dtype((np.float, np.array(values[0]).shape)) + result_type = np.dtype((float, np.array(values[0]).shape)) # Create dataset for failed samples self._make_dataset(name='collected_values', shape=(0,), diff --git a/test/test_hdf.py b/test/test_hdf.py index 8778a60..12569f4 100644 --- a/test/test_hdf.py +++ b/test/test_hdf.py @@ -87,7 +87,7 @@ def load_from_file(hdf_obj, obligatory_attributes): SCHEDULED_SAMPLES = ['L00_S0000000', 'L00_S0000001', 'L00_S0000002', 'L00_S0000003', 'L00_S0000004'] -RESULT_DATA_DTYPE = [("value", np.float), ("time", np.float)] +RESULT_DATA_DTYPE = [("value", float), ("time", float)] COLLECTED_SAMPLES = np.array([['L00S0000000', (np.array([10, 20]), np.array([5, 6]))], ['L00S0000001', (np.array([1, 2]), np.array([50, 60]))]]) From 972e77866e68ecb8b70111ad678f0f45a92c1655 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Tue, 3 Jan 2023 08:41:11 +0100 Subject: [PATCH 09/17] Fix incompatibility with new Numpy, add error reporting. --- mlmc/sample_storage_hdf.py | 3 +- mlmc/sampling_pool.py | 4 +- mlmc/sim/fullscale_transport_sim.py | 88 ----------------------------- mlmc/tool/hdf5.py | 25 ++++---- 4 files changed, 19 insertions(+), 101 deletions(-) delete mode 100644 mlmc/sim/fullscale_transport_sim.py diff --git a/mlmc/sample_storage_hdf.py b/mlmc/sample_storage_hdf.py index 2fa70c0..15f170d 100644 --- a/mlmc/sample_storage_hdf.py +++ b/mlmc/sample_storage_hdf.py @@ -125,7 +125,8 @@ def save_samples(self, successful, failed): def _save_succesful(self, successful_samples): for level, samples in successful_samples.items(): if len(samples) > 0: - self._level_groups[level].append_successful(np.array(samples)) + ids, sample_values = zip(*samples) + self._level_groups[level].append_successful(np.array(ids, dtype=str), np.array(sample_values, dtype=float)) def _save_failed(self, failed_samples): for level, samples in failed_samples.items(): diff --git a/mlmc/sampling_pool.py b/mlmc/sampling_pool.py index 0d40232..01800cc 100644 --- a/mlmc/sampling_pool.py +++ b/mlmc/sampling_pool.py @@ -122,6 +122,7 @@ def calculate_sample(sample_id, level_sim, work_dir=None, seed=None): except Exception: str_list = traceback.format_exception(*sys.exc_info()) err_msg = "".join(str_list) + print(err_msg) return sample_id, res, err_msg, running_time @@ -241,7 +242,8 @@ def _process_result(self, sample_id, result, err_msg, running_time, level_sim): self._save_running_time(level_sim._level_id, running_time) if not err_msg: - self._queues.setdefault(level_sim._level_id, queue.Queue()).put((sample_id, (result[0], result[1]))) + level_queue = self._queues.setdefault(level_sim._level_id, queue.Queue()) + level_queue.put((sample_id, (result[0], result[1]))) if not self._debug: SamplingPool.move_successful_rm(sample_id, level_sim, output_dir=self._output_dir, dest_dir=self._successful_dir) else: diff --git a/mlmc/sim/fullscale_transport_sim.py b/mlmc/sim/fullscale_transport_sim.py deleted file mode 100644 index 8404072..0000000 --- a/mlmc/sim/fullscale_transport_sim.py +++ /dev/null @@ -1,88 +0,0 @@ -import os -import copy -import shutil -import numpy as np -from typing import * - -import mlmc.random.correlated_field as cf -from typing import List -from mlmc.sim.simulation import Simulation -from mlmc.quantity.quantity_spec import QuantitySpec -from mlmc.level_simulation import LevelSimulation - - -class FullScaleTransportSim(Simulation): - - def __init__(self, config): - """ - :param config: Dict, simulation configuration - """ - #super().__init__() - self._config = config - - def level_instance(self, fine_level_params: List[float], coarse_level_params: List[float]) -> LevelSimulation: - """ - Called from mlmc.Sampler, it creates single instance of LevelSimulation (mlmc.level_simulation) - :param fine_level_params: fine simulation step at particular level - :param coarse_level_params: coarse simulation step at particular level - :return: mlmc.LevelSimulation object - """ - config = copy.deepcopy(self._config) - # Set sample specific parameters - # config["fine"] = {} - # config["coarse"] = {} - # config["fine"]["n_steps"] = fine_level_params[0] - # config["coarse"]["n_steps"] = coarse_level_params[0] - # config["res_format"] = self.result_format() - - return LevelSimulation(config_dict=config, - calculate=FullScaleTransportSim.calculate, - task_size=config["mesh_steps"][fine_level_params[0]], # @TODO: set size - need_sample_workspace=True) - - @staticmethod - def calculate(config, seed): - """ - Calculate fine and coarse sample and also extract their results - :param config: general configuration - :param seed: random number generator seed - :return: np.ndarray, np.ndarray - """ - - from endorse.fullscale_transport import fullscale_transport - - from endorse import common - from endorse.common import dotdict, memoize, File, call_flow, workdir, report - from endorse.mesh_class import Mesh - - ################### - ### fine sample ### - ################### - - #conf_file = os.path.join(config["work_dir"], "test_data/config_homo_tsx.yaml") - #cfg = common.load_config(conf_file) - #cfg.flow_env["flow_executable"] = config["flow_executable"] - #cfg["work_dir"] = config["work_dir"] - - val = fullscale_transport(config['main_cfg_file'], config['source_params'], seed) - q10 = list(val) - add_values = (10 - len(q10)) * [0.0] - q10.extend(add_values) #fixed_indicators[:len(ind_time_max)] = np.array(ind_time_max) - res_fine = np.asarray(q10) - #fine_res = fo.hydro - - ##################### - ### coarse sample ### - ##################### - res_coarse = np.zeros_like(res_fine) - - return res_fine, res_coarse - - def result_format(self) -> List[QuantitySpec]: - """ - Result format - :return: - """ - spec1 = QuantitySpec(name="indicator_conc", unit="g/m3", shape=(10, 1), times=[1], locations=['0']) - # spec2 = QuantitySpec(name="width", unit="mm", shape=(2, 1), times=[1, 2, 3], locations=['30', '40']) - return [spec1] diff --git a/mlmc/tool/hdf5.py b/mlmc/tool/hdf5.py index 32e2414..6fe8076 100644 --- a/mlmc/tool/hdf5.py +++ b/mlmc/tool/hdf5.py @@ -112,12 +112,14 @@ def add_level_group(self, level_id): # HDF5 path to particular level group level_group_hdf_path = '/Levels/' + level_id - with h5py.File(self.file_name, "a") as hdf_file: - # Create group (h5py.Group) if it has not yet been created - if level_group_hdf_path not in hdf_file: - # Create group for level named by level id (e.g. 0, 1, 2, ...) - hdf_file['Levels'].create_group(level_id) - + try: + with h5py.File(self.file_name, "a") as hdf_file: + # Create group (h5py.Group) if it has not yet been created + if level_group_hdf_path not in hdf_file: + # Create group for level named by level id (e.g. 0, 1, 2, ...) + hdf_file['Levels'].create_group(level_id) + except BlockingIOError as e: + raise BlockingIOError(f"Unable to lock file: {self.file_name}") return LevelGroup(self.file_name, level_group_hdf_path, level_id, loaded_from_file=self._load_from_file) @property @@ -300,16 +302,17 @@ def append_scheduled(self, scheduled_samples): if len(scheduled_samples) > 0: self._append_dataset(self.scheduled_dset, scheduled_samples) - def append_successful(self, samples: np.array): + def append_successful(self, sample_ids:np.array, samples: np.array): """ Save level samples to datasets (h5py.Dataset), save ids of collected samples and their results :param samples: np.ndarray :return: None """ - self._append_dataset(self.collected_ids_dset, samples[:, 0]) + assert samples.shape[0] == len(sample_ids) + assert samples.shape[1] == 2 + self._append_dataset(self.collected_ids_dset, sample_ids) - values = samples[:, 1] - result_type = np.dtype((float, np.array(values[0]).shape)) + result_type = np.dtype((float, np.array(samples[0]).shape)) # Create dataset for failed samples self._make_dataset(name='collected_values', shape=(0,), @@ -317,7 +320,7 @@ def append_successful(self, samples: np.array): chunks=True) d_name = 'collected_values' - self._append_dataset(d_name, [val for val in values]) + self._append_dataset(d_name, [val for val in samples]) def append_failed(self, failed_samples): """ From 9e2f88c46db32aedc389658ea7983432461cabbb Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Thu, 12 Jan 2023 13:22:03 +0100 Subject: [PATCH 10/17] fix spurious error in call of qsub --- mlmc/sampling_pool_pbs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlmc/sampling_pool_pbs.py b/mlmc/sampling_pool_pbs.py index 677d7f4..8f35afe 100644 --- a/mlmc/sampling_pool_pbs.py +++ b/mlmc/sampling_pool_pbs.py @@ -232,7 +232,7 @@ def execute(self): job_file = os.path.join(self._jobs_dir, SamplingPoolPBS.JOB.format(job_id)) script_content = "\n".join(self.pbs_script) self.write_script(script_content, job_file) - process = self.pbs_commands.qsub([job_file]) + process = self.pbs_commands.qsub(['--', job_file]) try: if process.status != 0: From 1b75ccad6c9f1ec10ab17ac81128e9f3300157c0 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Fri, 13 Jan 2023 16:23:46 +0100 Subject: [PATCH 11/17] Fix qsub call, improve failure reporting --- mlmc/sampling_pool.py | 18 ++++++++++++++++++ mlmc/sampling_pool_pbs.py | 17 ++++++++--------- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/mlmc/sampling_pool.py b/mlmc/sampling_pool.py index 01800cc..df6e0ba 100644 --- a/mlmc/sampling_pool.py +++ b/mlmc/sampling_pool.py @@ -136,6 +136,24 @@ def change_to_sample_directory(work_dir, path: str): sample_dir = os.path.join(work_dir, path) if not os.path.isdir(sample_dir): os.makedirs(sample_dir, mode=0o775, exist_ok=True) + + # We have observed possible problems with directory creation on the + # network filesystem. However it is not sure that was the cause of the problem. + # This code would make sure that we have write access to the created directory. + + # Commented out in order to try to reproduce the problem. + #for i in range(30): + #try: + #with open(os.path.join(sample_dir, "_test_file.txt"), "w") as f: + #f.write("test") + #except: + #time.sleep(1) + #continue + #print("Workdir ready after {i} sleep seconds.") + #break + #else: + #print("Workdir still not ready ready after {i} sleep seconds.") + return sample_dir @staticmethod diff --git a/mlmc/sampling_pool_pbs.py b/mlmc/sampling_pool_pbs.py index 8f35afe..65491bf 100644 --- a/mlmc/sampling_pool_pbs.py +++ b/mlmc/sampling_pool_pbs.py @@ -232,11 +232,14 @@ def execute(self): job_file = os.path.join(self._jobs_dir, SamplingPoolPBS.JOB.format(job_id)) script_content = "\n".join(self.pbs_script) self.write_script(script_content, job_file) - process = self.pbs_commands.qsub(['--', job_file]) + process = self.pbs_commands.qsub([job_file]) - try: - if process.status != 0: - raise Exception(process.stderr) + if process.status != 0: + self._qsub_failed_n += 1 + print(f"\nWARNING: FAILED QSUB, {self._qsub_failed_n} consecutive\n: {process}") + if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N: + raise Exception(str(process)) + else: # Find all finished jobs self._qsub_failed_n = 0 # Write current job count @@ -251,10 +254,6 @@ def execute(self): self._current_job_weight = 0 self._n_samples_in_job = 0 self._scheduled = [] - except: - self._qsub_failed_n += 1 - if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N: - raise Exception(process.stderr) def _create_script(self): """ @@ -266,7 +265,7 @@ def _create_script(self): self._pbs_config['pbs_output_dir'] = self._jobs_dir self._pbs_config['output_dir'] = self._output_dir self._pbs_config['work_dir'] = self._work_dir - + self.pbs_script = [line.format(**self._pbs_config) for line in self._pbs_header_template] def write_script(self, content, job_file): From 492c926eccfe8cba18edf8917716665e20b67d6d Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Thu, 19 Jan 2023 21:33:46 +0100 Subject: [PATCH 12/17] Simplify qsub call. --- mlmc/sampling_pool_pbs.py | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/mlmc/sampling_pool_pbs.py b/mlmc/sampling_pool_pbs.py index 677d7f4..ae2c26d 100644 --- a/mlmc/sampling_pool_pbs.py +++ b/mlmc/sampling_pool_pbs.py @@ -232,29 +232,27 @@ def execute(self): job_file = os.path.join(self._jobs_dir, SamplingPoolPBS.JOB.format(job_id)) script_content = "\n".join(self.pbs_script) self.write_script(script_content, job_file) - process = self.pbs_commands.qsub([job_file]) - try: - if process.status != 0: - raise Exception(process.stderr) - # Find all finished jobs - self._qsub_failed_n = 0 - # Write current job count - self._job_count += 1 - - # Get pbs_id from qsub output - pbs_id = process.stdout.split(".")[0] - # Store pbs id for future qstat calls - self._pbs_ids.append(pbs_id) - pbs_process.write_pbs_id(pbs_id) - - self._current_job_weight = 0 - self._n_samples_in_job = 0 - self._scheduled = [] - except: + process = self.pbs_commands.qsub([job_file]) + if process.status != 0: self._qsub_failed_n += 1 + print(f"FAILED QSUB {self._qsub_failed_n}: {process}") if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N: - raise Exception(process.stderr) + raise Exception(process) + # Find all finished jobs + self._qsub_failed_n = 0 + # Write current job count + self._job_count += 1 + + # Get pbs_id from qsub output + pbs_id = process.stdout.split(".")[0] + # Store pbs id for future qstat calls + self._pbs_ids.append(pbs_id) + pbs_process.write_pbs_id(pbs_id) + + self._current_job_weight = 0 + self._n_samples_in_job = 0 + self._scheduled = [] def _create_script(self): """ From 1d755db60a95c923d8e8544f1e76a6bf95a1017a Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Thu, 19 Jan 2023 21:35:21 +0100 Subject: [PATCH 13/17] Implement and use File open waiting for the lock. --- mlmc/tool/hdf5.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/mlmc/tool/hdf5.py b/mlmc/tool/hdf5.py index 6fe8076..d42b0f7 100644 --- a/mlmc/tool/hdf5.py +++ b/mlmc/tool/hdf5.py @@ -1,6 +1,28 @@ import numpy as np import h5py from mlmc.quantity.quantity_spec import ChunkSpec +import time + +class FileSafe(h5py.File): + """ + Context manager for openning HDF5 files with some timeout + amd retrying of getting acces.creation and usage of a workspace dir. + """ + def __init__(self, filename:str, mode='r', timeout=5, **kwargs): + """ + :param filename: + :param timeout: time to try acquire the lock + """ + end_time = time.time() + timeout + while time.time() < end_time: + try: + super().__init__(filename, **kwargs) + return + except BlockingIOError as e: + time.sleep(0.01) + continue + break + raise BlockingIOError(f"Unable to lock access to HDF5 file: {filename}, give up after: {timeout}s.") class HDF5: @@ -349,14 +371,15 @@ def scheduled(self): Read level dataset with scheduled samples :return: """ - with h5py.File(self.file_name, 'r') as hdf_file: + with FileSafe(self.file_name, 'r') as hdf_file: scheduled_dset = hdf_file[self.level_group_path][self.scheduled_dset] return scheduled_dset[()] def chunks(self, n_samples=None): with h5py.File(self.file_name, 'r') as hdf_file: if 'collected_values' not in hdf_file[self.level_group_path]: - raise AttributeError("No collected values in level group ".format(self.level_id)) + raise AttributeError(f"No collected values for level {self.level_id} at {self.file_name}:{self.level_group_path}." + f"Found keys: {list(hdf_file[self.level_group_path].keys())}") dataset = hdf_file["/".join([self.level_group_path, "collected_values"])] if n_samples is not None: From 8ff30b69bbb15de15322f392b99161ca3a8719a3 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Thu, 19 Jan 2023 21:40:34 +0100 Subject: [PATCH 14/17] Support result format with qunatities of different shape. --- mlmc/sample_storage_hdf.py | 43 +++++++------------------ mlmc/tool/hdf5.py | 64 ++++++++++++++++++++++---------------- 2 files changed, 49 insertions(+), 58 deletions(-) diff --git a/mlmc/sample_storage_hdf.py b/mlmc/sample_storage_hdf.py index 15f170d..4ebf7ca 100644 --- a/mlmc/sample_storage_hdf.py +++ b/mlmc/sample_storage_hdf.py @@ -33,27 +33,6 @@ def __init__(self, file_path): for i_level in range(len(self._hdf_object.level_parameters)): self._level_groups.append(self._hdf_object.add_level_group(str(i_level))) - def _hdf_result_format(self, locations, times): - """ - QuantitySpec data type, necessary for hdf storage - :return: - """ - if len(locations[0]) == 3: - tuple_dtype = np.dtype((float, (3,))) - loc_dtype = np.dtype((tuple_dtype, (len(locations),))) - else: - loc_dtype = np.dtype(('S50', (len(locations),))) - - result_dtype = {'names': ('name', 'unit', 'shape', 'times', 'locations'), - 'formats': ('S50', - 'S50', - np.dtype((np.int32, (2,))), - np.dtype((float, (len(times),))), - loc_dtype - ) - } - - return result_dtype def save_global_data(self, level_parameters: List[float], result_format: List[QuantitySpec]): """ @@ -62,7 +41,6 @@ def save_global_data(self, level_parameters: List[float], result_format: List[Qu :param result_format: simulation result format :return: None """ - res_dtype = self._hdf_result_format(result_format[0].locations, result_format[0].times) # Create file structure self._hdf_object.create_file_structure(level_parameters) @@ -73,7 +51,7 @@ def save_global_data(self, level_parameters: List[float], result_format: List[Qu self._level_groups.append(self._hdf_object.add_level_group(str(i_level))) # Save result format (QuantitySpec) - self.save_result_format(result_format, res_dtype) + self.save_result_format(result_format) def load_scheduled_samples(self): """ @@ -85,7 +63,7 @@ def load_scheduled_samples(self): scheduled[int(level.level_id)] = [sample[0].decode() for sample in level.scheduled()] return scheduled - def save_result_format(self, result_format: List[QuantitySpec], res_dtype): + def save_result_format(self, result_format: List[QuantitySpec]): """ Save result format to hdf :param result_format: List[QuantitySpec] @@ -96,20 +74,21 @@ def save_result_format(self, result_format: List[QuantitySpec], res_dtype): raise ValueError('You are setting a new different result format for an existing sample storage') except AttributeError: pass - self._hdf_object.save_result_format(result_format, res_dtype) + self._hdf_object.save_result_format(result_format) + + @staticmethod + def make_qspec(name, unit, shape, times, locations): + return QuantitySpec(name.decode(), unit.decode(), shape, times, [loc.decode() for loc in locations]) def load_result_format(self) -> List[QuantitySpec]: """ Load result format """ results_format = self._hdf_object.load_result_format() - quantities = [] - for res_format in results_format: - spec = QuantitySpec(res_format[0].decode(), res_format[1].decode(), res_format[2], res_format[3], - [loc.decode() for loc in res_format[4]]) - - quantities.append(spec) - + quantities = [ + self.make_qspec(*res_format[0]) + for ispec, res_format in sorted(results_format.items()) + ] return quantities def save_samples(self, successful, failed): diff --git a/mlmc/tool/hdf5.py b/mlmc/tool/hdf5.py index d42b0f7..fa720ae 100644 --- a/mlmc/tool/hdf5.py +++ b/mlmc/tool/hdf5.py @@ -152,39 +152,50 @@ def result_format_dset_name(self): """ return "result_format" - def save_result_format(self, result_format, res_dtype): + def single_format(self, spec:"QuantitySpec"): + # point or named region + loc_dtype = np.dtype((float, (3,))) if len(spec.locations) == 3 else 'S50' + locations_dtype = np.dtype((loc_dtype, (len(spec.locations),))) + result_dtype = {'names': ('name','unit', 'shape', 'times', 'locations'), + 'formats': ('S50', + 'S50', + np.dtype((np.int32, (2,))), + np.dtype((float, (len(spec.times),))), + locations_dtype + ) + } + format_items = (spec.name, spec.unit, spec.shape, spec.times, spec.locations) + res_format = np.array([format_items], dtype=result_dtype) + return res_format, result_dtype + + + def save_result_format(self, result_format): """ Save result format to dataset :param result_format: List[QuantitySpec] :param res_dtype: result numpy dtype :return: None """ - result_format_dtype = res_dtype - - # Create data set with h5py.File(self.file_name, 'a') as hdf_file: - # Check if dataset exists + # format item in main group if self.result_format_dset_name not in hdf_file: - hdf_file.create_dataset( - self.result_format_dset_name, - shape=(len(result_format),), - dtype=result_format_dtype, - maxshape=(None,), - chunks=True) - - # Format data - result_array = np.empty((len(result_format),), dtype=result_format_dtype) - for res, quantity_spec in zip(result_array, result_format): - for attribute in list(quantity_spec.__dict__.keys()): - if isinstance(getattr(quantity_spec, attribute), (tuple, list)): - res[attribute][:] = getattr(quantity_spec, attribute) + format_group = hdf_file.create_group(self.result_format_dset_name) + else: + format_group = hdf_file[self.result_format_dset_name] + # dataset item qith format spec for every QuantitySpec + for ispec, qspec in enumerate(result_format): + ispec = f"{ispec:04d}" + format, format_dtype = self.single_format(qspec) + if ispec not in format_group: + quantity_dset = format_group.create_dataset( + name=ispec, + shape=(1,), + dtype=format_dtype, + maxshape=(None,), + chunks=True) else: - res[attribute] = getattr(quantity_spec, attribute) - - # Write to file - with h5py.File(self.file_name, 'a') as hdf_file: - dataset = hdf_file[self.result_format_dset_name] - dataset[:] = result_array + quantity_dset = format_group[ispec] + quantity_dset[0] = format def load_result_format(self): """ @@ -195,8 +206,9 @@ def load_result_format(self): if self.result_format_dset_name not in hdf_file: raise AttributeError - dataset = hdf_file[self.result_format_dset_name] - return dataset[()] + format_group = hdf_file[self.result_format_dset_name] + format = {ispec: np.array(q_dset) for ispec, q_dset in format_group.items()} + return format def load_level_parameters(self): with h5py.File(self.file_name, "r") as hdf_file: From 11fcb33c405d000ac7b87169192677549021b0e9 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Sat, 21 Jan 2023 16:43:47 +0100 Subject: [PATCH 15/17] Merge branch 'MS_endorse' of github.com:GeoMop/MLMC into MS_endorse --- mlmc/sampling_pool_pbs.py | 56 ++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/mlmc/sampling_pool_pbs.py b/mlmc/sampling_pool_pbs.py index a36d00c..3df7ed3 100644 --- a/mlmc/sampling_pool_pbs.py +++ b/mlmc/sampling_pool_pbs.py @@ -212,33 +212,35 @@ def execute(self): Execute pbs script :return: None """ - if len(self._scheduled) > 0: - job_id = "{:04d}".format(self._job_count) - # Create pbs job - pbs_process = PbsJob.create_job(self._output_dir, self._jobs_dir, job_id, - SamplingPoolPBS.LEVEL_SIM_CONFIG, self._debug) - - pbs_process.save_sample_id_job_id(job_id, self._scheduled) - # Write scheduled samples to file - pbs_process.save_scheduled(self._scheduled) - - # Format pbs script - self._create_script() - - if self.pbs_script is None or self._n_samples_in_job == 0: - return - - # Write pbs script - job_file = os.path.join(self._jobs_dir, SamplingPoolPBS.JOB.format(job_id)) - script_content = "\n".join(self.pbs_script) - self.write_script(script_content, job_file) - - if process.status != 0: - self._qsub_failed_n += 1 - print(f"\nWARNING: FAILED QSUB, {self._qsub_failed_n} consecutive\n: {process}") - if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N: - raise Exception(str(process)) - else: + if len(self._scheduled) <= 0: + return + job_id = "{:04d}".format(self._job_count) + # Create pbs job + pbs_process = PbsJob.create_job(self._output_dir, self._jobs_dir, job_id, + SamplingPoolPBS.LEVEL_SIM_CONFIG, self._debug) + + pbs_process.save_sample_id_job_id(job_id, self._scheduled) + # Write scheduled samples to file + pbs_process.save_scheduled(self._scheduled) + + # Format pbs script + self._create_script() + + if self.pbs_script is None or self._n_samples_in_job == 0: + return + + # Write pbs script + job_file = os.path.join(self._jobs_dir, SamplingPoolPBS.JOB.format(job_id)) + script_content = "\n".join(self.pbs_script) + self.write_script(script_content, job_file) + + process = self.pbs_commands.qsub([job_file]) + if process.status != 0: + self._qsub_failed_n += 1 + print(f"\nWARNING: FAILED QSUB, {self._qsub_failed_n} consecutive\n: {process}") + if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N: + raise Exception(str(process)) + else: # Find all finished jobs self._qsub_failed_n = 0 # Write current job count From e0f7c094ca4065246036e5882d5192555bc10053 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Sun, 22 Jan 2023 17:46:18 +0100 Subject: [PATCH 16/17] Trying to fix file openning in threads. --- mlmc/tool/hdf5.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/mlmc/tool/hdf5.py b/mlmc/tool/hdf5.py index fa720ae..e80dfc0 100644 --- a/mlmc/tool/hdf5.py +++ b/mlmc/tool/hdf5.py @@ -2,6 +2,7 @@ import h5py from mlmc.quantity.quantity_spec import ChunkSpec import time +import logging class FileSafe(h5py.File): """ @@ -16,12 +17,13 @@ def __init__(self, filename:str, mode='r', timeout=5, **kwargs): end_time = time.time() + timeout while time.time() < end_time: try: - super().__init__(filename, **kwargs) + super().__init__(filename, mode, **kwargs) return except BlockingIOError as e: time.sleep(0.01) continue break + logging.exception(f"Unable to lock access to HDF5 file: {filename}, give up after: {timeout}s.") raise BlockingIOError(f"Unable to lock access to HDF5 file: {filename}, give up after: {timeout}s.") @@ -118,7 +120,7 @@ def init_header(self, level_parameters): :param level_parameters: MLMC level range of steps :return: None """ - with h5py.File(self.file_name, "a") as hdf_file: + with FileSafe(self.file_name, "a") as hdf_file: # Set global attributes to root group (h5py.Group) hdf_file.attrs['version'] = '1.0.1' hdf_file.attrs['level_parameters'] = level_parameters @@ -135,7 +137,7 @@ def add_level_group(self, level_id): level_group_hdf_path = '/Levels/' + level_id try: - with h5py.File(self.file_name, "a") as hdf_file: + with FileSafe(self.file_name, "a") as hdf_file: # Create group (h5py.Group) if it has not yet been created if level_group_hdf_path not in hdf_file: # Create group for level named by level id (e.g. 0, 1, 2, ...) @@ -250,7 +252,7 @@ def __init__(self, file_name, hdf_group_path, level_id, loaded_from_file=False): # Chunk size and corresponding number of items # Set group attribute 'level_id' - with h5py.File(self.file_name, 'a') as hdf_file: + with FileSafe(self.file_name, 'a') as hdf_file: if 'level_id' not in hdf_file[self.level_group_path].attrs: hdf_file[self.level_group_path].attrs['level_id'] = self.level_id From 30d27639d24a81e2dde7ae85ab5ab8f7720ed5ba Mon Sep 17 00:00:00 2001 From: Pavel Exner Date: Tue, 15 Jul 2025 10:05:48 +0200 Subject: [PATCH 17/17] Minor fix in sample storage for newer numpy. --- mlmc/sample_storage_hdf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlmc/sample_storage_hdf.py b/mlmc/sample_storage_hdf.py index 4ebf7ca..7cdbd63 100644 --- a/mlmc/sample_storage_hdf.py +++ b/mlmc/sample_storage_hdf.py @@ -5,7 +5,7 @@ from mlmc.quantity.quantity_spec import QuantitySpec, ChunkSpec import mlmc.tool.hdf5 as hdf import warnings -warnings.simplefilter("ignore", np.VisibleDeprecationWarning) +warnings.simplefilter("ignore", np.exceptions.VisibleDeprecationWarning) class SampleStorageHDF(SampleStorage):