diff --git a/mlmc/sample_storage.py b/mlmc/sample_storage.py index 01be908..784637f 100644 --- a/mlmc/sample_storage.py +++ b/mlmc/sample_storage.py @@ -171,7 +171,7 @@ def _save_successful(self, samples): res = np.array(res) fine_coarse_res = res[:, 1] - result_type = np.dtype((np.float, np.array(fine_coarse_res[0]).shape)) + result_type = np.dtype((float, np.array(fine_coarse_res[0]).shape)) results = np.empty(shape=(len(res),), dtype=result_type) results[:] = [val for val in fine_coarse_res] diff --git a/mlmc/sample_storage_hdf.py b/mlmc/sample_storage_hdf.py index 7e5fbef..7cdbd63 100644 --- a/mlmc/sample_storage_hdf.py +++ b/mlmc/sample_storage_hdf.py @@ -5,7 +5,7 @@ from mlmc.quantity.quantity_spec import QuantitySpec, ChunkSpec import mlmc.tool.hdf5 as hdf import warnings -warnings.simplefilter("ignore", np.VisibleDeprecationWarning) +warnings.simplefilter("ignore", np.exceptions.VisibleDeprecationWarning) class SampleStorageHDF(SampleStorage): @@ -33,36 +33,14 @@ def __init__(self, file_path): for i_level in range(len(self._hdf_object.level_parameters)): self._level_groups.append(self._hdf_object.add_level_group(str(i_level))) - def _hdf_result_format(self, locations, times): - """ - QuantitySpec data type, necessary for hdf storage - :return: - """ - if len(locations[0]) == 3: - tuple_dtype = np.dtype((np.float, (3,))) - loc_dtype = np.dtype((tuple_dtype, (len(locations),))) - else: - loc_dtype = np.dtype(('S50', (len(locations),))) - - result_dtype = {'names': ('name', 'unit', 'shape', 'times', 'locations'), - 'formats': ('S50', - 'S50', - np.dtype((np.int32, (2,))), - np.dtype((np.float, (len(times),))), - loc_dtype - ) - } - - return result_dtype - def save_global_data(self, level_parameters: List[np.float], result_format: List[QuantitySpec]): + def save_global_data(self, level_parameters: List[float], result_format: List[QuantitySpec]): """ Save hdf5 file global attributes :param level_parameters: list of simulation steps :param result_format: simulation result format :return: None """ - res_dtype = self._hdf_result_format(result_format[0].locations, result_format[0].times) # Create file structure self._hdf_object.create_file_structure(level_parameters) @@ -73,7 +51,7 @@ def save_global_data(self, level_parameters: List[np.float], result_format: List self._level_groups.append(self._hdf_object.add_level_group(str(i_level))) # Save result format (QuantitySpec) - self.save_result_format(result_format, res_dtype) + self.save_result_format(result_format) def load_scheduled_samples(self): """ @@ -85,7 +63,7 @@ def load_scheduled_samples(self): scheduled[int(level.level_id)] = [sample[0].decode() for sample in level.scheduled()] return scheduled - def save_result_format(self, result_format: List[QuantitySpec], res_dtype): + def save_result_format(self, result_format: List[QuantitySpec]): """ Save result format to hdf :param result_format: List[QuantitySpec] @@ -96,20 +74,21 @@ def save_result_format(self, result_format: List[QuantitySpec], res_dtype): raise ValueError('You are setting a new different result format for an existing sample storage') except AttributeError: pass - self._hdf_object.save_result_format(result_format, res_dtype) + self._hdf_object.save_result_format(result_format) + + @staticmethod + def make_qspec(name, unit, shape, times, locations): + return QuantitySpec(name.decode(), unit.decode(), shape, times, [loc.decode() for loc in locations]) def load_result_format(self) -> List[QuantitySpec]: """ Load result format """ results_format = self._hdf_object.load_result_format() - quantities = [] - for res_format in results_format: - spec = QuantitySpec(res_format[0].decode(), res_format[1].decode(), res_format[2], res_format[3], - [loc.decode() for loc in res_format[4]]) - - quantities.append(spec) - + quantities = [ + self.make_qspec(*res_format[0]) + for ispec, res_format in sorted(results_format.items()) + ] return quantities def save_samples(self, successful, failed): @@ -125,7 +104,8 @@ def save_samples(self, successful, failed): def _save_succesful(self, successful_samples): for level, samples in successful_samples.items(): if len(samples) > 0: - self._level_groups[level].append_successful(np.array(samples)) + ids, sample_values = zip(*samples) + self._level_groups[level].append_successful(np.array(ids, dtype=str), np.array(sample_values, dtype=float)) def _save_failed(self, failed_samples): for level, samples in failed_samples.items(): diff --git a/mlmc/sampling_pool.py b/mlmc/sampling_pool.py index 0d40232..df6e0ba 100644 --- a/mlmc/sampling_pool.py +++ b/mlmc/sampling_pool.py @@ -122,6 +122,7 @@ def calculate_sample(sample_id, level_sim, work_dir=None, seed=None): except Exception: str_list = traceback.format_exception(*sys.exc_info()) err_msg = "".join(str_list) + print(err_msg) return sample_id, res, err_msg, running_time @@ -135,6 +136,24 @@ def change_to_sample_directory(work_dir, path: str): sample_dir = os.path.join(work_dir, path) if not os.path.isdir(sample_dir): os.makedirs(sample_dir, mode=0o775, exist_ok=True) + + # We have observed possible problems with directory creation on the + # network filesystem. However it is not sure that was the cause of the problem. + # This code would make sure that we have write access to the created directory. + + # Commented out in order to try to reproduce the problem. + #for i in range(30): + #try: + #with open(os.path.join(sample_dir, "_test_file.txt"), "w") as f: + #f.write("test") + #except: + #time.sleep(1) + #continue + #print("Workdir ready after {i} sleep seconds.") + #break + #else: + #print("Workdir still not ready ready after {i} sleep seconds.") + return sample_dir @staticmethod @@ -241,7 +260,8 @@ def _process_result(self, sample_id, result, err_msg, running_time, level_sim): self._save_running_time(level_sim._level_id, running_time) if not err_msg: - self._queues.setdefault(level_sim._level_id, queue.Queue()).put((sample_id, (result[0], result[1]))) + level_queue = self._queues.setdefault(level_sim._level_id, queue.Queue()) + level_queue.put((sample_id, (result[0], result[1]))) if not self._debug: SamplingPool.move_successful_rm(sample_id, level_sim, output_dir=self._output_dir, dest_dir=self._successful_dir) else: diff --git a/mlmc/sampling_pool_pbs.py b/mlmc/sampling_pool_pbs.py index 01c4128..3df7ed3 100644 --- a/mlmc/sampling_pool_pbs.py +++ b/mlmc/sampling_pool_pbs.py @@ -8,6 +8,7 @@ from mlmc.level_simulation import LevelSimulation from mlmc.sampling_pool import SamplingPool from mlmc.tool.pbs_job import PbsJob +from mlmc.tool.pbs_commands import PbsCommands """ SamplingPoolPBS description @@ -90,6 +91,17 @@ def __init__(self, work_dir, debug=False): self._qsub_failed_n = 0 self._qstat_failed_n = 0 # Number of failed execution of commands qsub, qstat + self._pbs_commands = None + + @property + def pbs_commands(self): + if self._pbs_commands is None: + self._pbs_commands = PbsCommands() + return self._pbs_commands + + @pbs_commands.setter + def pbs_commands(self, pbs_commands_obj): + self._pbs_commands = pbs_commands_obj def _get_job_count(self): """ @@ -200,49 +212,49 @@ def execute(self): Execute pbs script :return: None """ - if len(self._scheduled) > 0: - job_id = "{:04d}".format(self._job_count) - # Create pbs job - pbs_process = PbsJob.create_job(self._output_dir, self._jobs_dir, job_id, - SamplingPoolPBS.LEVEL_SIM_CONFIG, self._debug) - - pbs_process.save_sample_id_job_id(job_id, self._scheduled) - # Write scheduled samples to file - pbs_process.save_scheduled(self._scheduled) - - # Format pbs script - self._create_script() - - if self.pbs_script is None or self._n_samples_in_job == 0: - return + if len(self._scheduled) <= 0: + return + job_id = "{:04d}".format(self._job_count) + # Create pbs job + pbs_process = PbsJob.create_job(self._output_dir, self._jobs_dir, job_id, + SamplingPoolPBS.LEVEL_SIM_CONFIG, self._debug) + + pbs_process.save_sample_id_job_id(job_id, self._scheduled) + # Write scheduled samples to file + pbs_process.save_scheduled(self._scheduled) + + # Format pbs script + self._create_script() + + if self.pbs_script is None or self._n_samples_in_job == 0: + return + + # Write pbs script + job_file = os.path.join(self._jobs_dir, SamplingPoolPBS.JOB.format(job_id)) + script_content = "\n".join(self.pbs_script) + self.write_script(script_content, job_file) + + process = self.pbs_commands.qsub([job_file]) + if process.status != 0: + self._qsub_failed_n += 1 + print(f"\nWARNING: FAILED QSUB, {self._qsub_failed_n} consecutive\n: {process}") + if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N: + raise Exception(str(process)) + else: + # Find all finished jobs + self._qsub_failed_n = 0 + # Write current job count + self._job_count += 1 - # Write pbs script - job_file = os.path.join(self._jobs_dir, SamplingPoolPBS.JOB.format(job_id)) - script_content = "\n".join(self.pbs_script) - self.write_script(script_content, job_file) + # Get pbs_id from qsub output + pbs_id = process.stdout.split(".")[0] + # Store pbs id for future qstat calls + self._pbs_ids.append(pbs_id) + pbs_process.write_pbs_id(pbs_id) - process = subprocess.run(['qsub', job_file], stderr=subprocess.PIPE, stdout=subprocess.PIPE) - try: - if process.returncode != 0: - raise Exception(process.stderr.decode('ascii')) - # Find all finished jobs - self._qsub_failed_n = 0 - # Write current job count - self._job_count += 1 - - # Get pbs_id from qsub output - pbs_id = process.stdout.decode("ascii").split(".")[0] - # Store pbs id for future qstat calls - self._pbs_ids.append(pbs_id) - pbs_process.write_pbs_id(pbs_id) - - self._current_job_weight = 0 - self._n_samples_in_job = 0 - self._scheduled = [] - except: - self._qsub_failed_n += 1 - if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N: - raise Exception(process.stderr.decode("ascii")) + self._current_job_weight = 0 + self._n_samples_in_job = 0 + self._scheduled = [] def _create_script(self): """ @@ -254,7 +266,7 @@ def _create_script(self): self._pbs_config['pbs_output_dir'] = self._jobs_dir self._pbs_config['output_dir'] = self._output_dir self._pbs_config['work_dir'] = self._work_dir - + self.pbs_script = [line.format(**self._pbs_config) for line in self._pbs_header_template] def write_script(self, content, job_file): @@ -286,22 +298,22 @@ def _qstat_pbs_job(self): if len(self._pbs_ids) > 0: # Get PBS id's status, # '-x' - displays status information for finished and moved jobs in addition to queued and running jobs. - qstat_call = ["qstat", "-x"] - qstat_call.extend(self._pbs_ids) + qstat_args = ["-x"] + qstat_args.extend(self._pbs_ids) # qstat call - process = subprocess.run(qstat_call, stderr=subprocess.PIPE, stdout=subprocess.PIPE) + process = self.pbs_commands.qstat(qstat_args) try: - if process.returncode != 0: - raise Exception(process.stderr.decode("ascii")) - output = process.stdout.decode("ascii") + if process.status != 0: + raise Exception(process.stderr) + output = process.stdout # Find all finished jobs finished_pbs_jobs = re.findall(r"(\d+)\..*\d+ F", output) self._qstat_failed_n = 0 except: self._qstat_failed_n += 1 if self._qstat_failed_n > SamplingPoolPBS.QSTAT_FAILED_MAX_N: - raise Exception(process.stderr.decode("ascii")) + raise Exception(process.stderr) finished_pbs_jobs = [] # Get unfinished as diff between planned and finished diff --git a/mlmc/tool/hdf5.py b/mlmc/tool/hdf5.py index f6f3219..e80dfc0 100644 --- a/mlmc/tool/hdf5.py +++ b/mlmc/tool/hdf5.py @@ -1,6 +1,30 @@ import numpy as np import h5py from mlmc.quantity.quantity_spec import ChunkSpec +import time +import logging + +class FileSafe(h5py.File): + """ + Context manager for openning HDF5 files with some timeout + amd retrying of getting acces.creation and usage of a workspace dir. + """ + def __init__(self, filename:str, mode='r', timeout=5, **kwargs): + """ + :param filename: + :param timeout: time to try acquire the lock + """ + end_time = time.time() + timeout + while time.time() < end_time: + try: + super().__init__(filename, mode, **kwargs) + return + except BlockingIOError as e: + time.sleep(0.01) + continue + break + logging.exception(f"Unable to lock access to HDF5 file: {filename}, give up after: {timeout}s.") + raise BlockingIOError(f"Unable to lock access to HDF5 file: {filename}, give up after: {timeout}s.") class HDF5: @@ -96,7 +120,7 @@ def init_header(self, level_parameters): :param level_parameters: MLMC level range of steps :return: None """ - with h5py.File(self.file_name, "a") as hdf_file: + with FileSafe(self.file_name, "a") as hdf_file: # Set global attributes to root group (h5py.Group) hdf_file.attrs['version'] = '1.0.1' hdf_file.attrs['level_parameters'] = level_parameters @@ -112,12 +136,14 @@ def add_level_group(self, level_id): # HDF5 path to particular level group level_group_hdf_path = '/Levels/' + level_id - with h5py.File(self.file_name, "a") as hdf_file: - # Create group (h5py.Group) if it has not yet been created - if level_group_hdf_path not in hdf_file: - # Create group for level named by level id (e.g. 0, 1, 2, ...) - hdf_file['Levels'].create_group(level_id) - + try: + with FileSafe(self.file_name, "a") as hdf_file: + # Create group (h5py.Group) if it has not yet been created + if level_group_hdf_path not in hdf_file: + # Create group for level named by level id (e.g. 0, 1, 2, ...) + hdf_file['Levels'].create_group(level_id) + except BlockingIOError as e: + raise BlockingIOError(f"Unable to lock file: {self.file_name}") return LevelGroup(self.file_name, level_group_hdf_path, level_id, loaded_from_file=self._load_from_file) @property @@ -128,39 +154,50 @@ def result_format_dset_name(self): """ return "result_format" - def save_result_format(self, result_format, res_dtype): + def single_format(self, spec:"QuantitySpec"): + # point or named region + loc_dtype = np.dtype((float, (3,))) if len(spec.locations) == 3 else 'S50' + locations_dtype = np.dtype((loc_dtype, (len(spec.locations),))) + result_dtype = {'names': ('name','unit', 'shape', 'times', 'locations'), + 'formats': ('S50', + 'S50', + np.dtype((np.int32, (2,))), + np.dtype((float, (len(spec.times),))), + locations_dtype + ) + } + format_items = (spec.name, spec.unit, spec.shape, spec.times, spec.locations) + res_format = np.array([format_items], dtype=result_dtype) + return res_format, result_dtype + + + def save_result_format(self, result_format): """ Save result format to dataset :param result_format: List[QuantitySpec] :param res_dtype: result numpy dtype :return: None """ - result_format_dtype = res_dtype - - # Create data set with h5py.File(self.file_name, 'a') as hdf_file: - # Check if dataset exists + # format item in main group if self.result_format_dset_name not in hdf_file: - hdf_file.create_dataset( - self.result_format_dset_name, - shape=(len(result_format),), - dtype=result_format_dtype, - maxshape=(None,), - chunks=True) - - # Format data - result_array = np.empty((len(result_format),), dtype=result_format_dtype) - for res, quantity_spec in zip(result_array, result_format): - for attribute in list(quantity_spec.__dict__.keys()): - if isinstance(getattr(quantity_spec, attribute), (tuple, list)): - res[attribute][:] = getattr(quantity_spec, attribute) + format_group = hdf_file.create_group(self.result_format_dset_name) + else: + format_group = hdf_file[self.result_format_dset_name] + # dataset item qith format spec for every QuantitySpec + for ispec, qspec in enumerate(result_format): + ispec = f"{ispec:04d}" + format, format_dtype = self.single_format(qspec) + if ispec not in format_group: + quantity_dset = format_group.create_dataset( + name=ispec, + shape=(1,), + dtype=format_dtype, + maxshape=(None,), + chunks=True) else: - res[attribute] = getattr(quantity_spec, attribute) - - # Write to file - with h5py.File(self.file_name, 'a') as hdf_file: - dataset = hdf_file[self.result_format_dset_name] - dataset[:] = result_array + quantity_dset = format_group[ispec] + quantity_dset[0] = format def load_result_format(self): """ @@ -171,8 +208,9 @@ def load_result_format(self): if self.result_format_dset_name not in hdf_file: raise AttributeError - dataset = hdf_file[self.result_format_dset_name] - return dataset[()] + format_group = hdf_file[self.result_format_dset_name] + format = {ispec: np.array(q_dset) for ispec, q_dset in format_group.items()} + return format def load_level_parameters(self): with h5py.File(self.file_name, "r") as hdf_file: @@ -214,7 +252,7 @@ def __init__(self, file_name, hdf_group_path, level_id, loaded_from_file=False): # Chunk size and corresponding number of items # Set group attribute 'level_id' - with h5py.File(self.file_name, 'a') as hdf_file: + with FileSafe(self.file_name, 'a') as hdf_file: if 'level_id' not in hdf_file[self.level_group_path].attrs: hdf_file[self.level_group_path].attrs['level_id'] = self.level_id @@ -300,16 +338,17 @@ def append_scheduled(self, scheduled_samples): if len(scheduled_samples) > 0: self._append_dataset(self.scheduled_dset, scheduled_samples) - def append_successful(self, samples: np.array): + def append_successful(self, sample_ids:np.array, samples: np.array): """ Save level samples to datasets (h5py.Dataset), save ids of collected samples and their results :param samples: np.ndarray :return: None """ - self._append_dataset(self.collected_ids_dset, samples[:, 0]) + assert samples.shape[0] == len(sample_ids) + assert samples.shape[1] == 2 + self._append_dataset(self.collected_ids_dset, sample_ids) - values = samples[:, 1] - result_type = np.dtype((np.float, np.array(values[0]).shape)) + result_type = np.dtype((float, np.array(samples[0]).shape)) # Create dataset for failed samples self._make_dataset(name='collected_values', shape=(0,), @@ -317,7 +356,7 @@ def append_successful(self, samples: np.array): chunks=True) d_name = 'collected_values' - self._append_dataset(d_name, [val for val in values]) + self._append_dataset(d_name, [val for val in samples]) def append_failed(self, failed_samples): """ @@ -346,14 +385,15 @@ def scheduled(self): Read level dataset with scheduled samples :return: """ - with h5py.File(self.file_name, 'r') as hdf_file: + with FileSafe(self.file_name, 'r') as hdf_file: scheduled_dset = hdf_file[self.level_group_path][self.scheduled_dset] return scheduled_dset[()] def chunks(self, n_samples=None): with h5py.File(self.file_name, 'r') as hdf_file: if 'collected_values' not in hdf_file[self.level_group_path]: - raise AttributeError("No collected values in level group ".format(self.level_id)) + raise AttributeError(f"No collected values for level {self.level_id} at {self.file_name}:{self.level_group_path}." + f"Found keys: {list(hdf_file[self.level_group_path].keys())}") dataset = hdf_file["/".join([self.level_group_path, "collected_values"])] if n_samples is not None: diff --git a/mlmc/tool/pbs_commands.py b/mlmc/tool/pbs_commands.py new file mode 100644 index 0000000..550f02a --- /dev/null +++ b/mlmc/tool/pbs_commands.py @@ -0,0 +1,50 @@ +import subprocess +import attr +from abc import ABC, abstractmethod +from mlmc.sampling_pool import SamplingPool + + +class PbsCommandsAbstract(ABC): + + @abstractmethod + def qsub(self, args): + """ + :return: CommandOutput instance + """ + + @abstractmethod + def qstat(self, args): + """ + :return: CommandOutput instance + """ + + +class PbsCommands(PbsCommandsAbstract): + + def qsub(self, args): + process = self._run_command("qsub", args) + + return CommandOutput(status=process.returncode, + stdout=process.stdout.decode("ascii"), + stderr=process.stderr.decode("ascii")) + + def qstat(self, args): + process = self._run_command("qstat", args) + + return CommandOutput(status=process.returncode, + stdout=process.stdout.decode("ascii"), + stderr=process.stderr.decode("ascii")) + + @staticmethod + def _run_command(command, args): + return subprocess.run([command, *args], stderr=subprocess.PIPE, stdout=subprocess.PIPE) + + + +@attr.s(auto_attribs=True) +class CommandOutput: + status: int + stdout: str = None + stderr: str = None + + diff --git a/setup.py b/setup.py index 765d315..f9301bd 100644 --- a/setup.py +++ b/setup.py @@ -61,5 +61,5 @@ def read(*names, **kwargs): # include automatically all files in the template MANIFEST.in include_package_data=True, zip_safe=False, - install_requires=['numpy', 'scipy', 'sklearn', 'h5py>=3.1.0', 'ruamel.yaml', 'attrs', 'gstools', 'memoization'], + install_requires=['numpy', 'scipy', 'scikit-learn', 'h5py>=3.1.0', 'ruamel.yaml', 'attrs', 'gstools', 'memoization'], ) diff --git a/test/test_hdf.py b/test/test_hdf.py index 8778a60..12569f4 100644 --- a/test/test_hdf.py +++ b/test/test_hdf.py @@ -87,7 +87,7 @@ def load_from_file(hdf_obj, obligatory_attributes): SCHEDULED_SAMPLES = ['L00_S0000000', 'L00_S0000001', 'L00_S0000002', 'L00_S0000003', 'L00_S0000004'] -RESULT_DATA_DTYPE = [("value", np.float), ("time", np.float)] +RESULT_DATA_DTYPE = [("value", float), ("time", float)] COLLECTED_SAMPLES = np.array([['L00S0000000', (np.array([10, 20]), np.array([5, 6]))], ['L00S0000001', (np.array([1, 2]), np.array([50, 60]))]])