-
Notifications
You must be signed in to change notification settings - Fork 2
Ms endorse #195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Ms endorse #195
Changes from all commits
1c4eea1
f2b5a21
fec6dcf
2bea0ad
618da8a
ce5f8c1
e2ff856
ee0f040
972e778
9e2f88c
1b75cca
492c926
1d755db
8ff30b6
48f04cb
11fcb33
e0f7c09
30d2763
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,7 @@ | |
| from mlmc.quantity.quantity_spec import QuantitySpec, ChunkSpec | ||
| import mlmc.tool.hdf5 as hdf | ||
| import warnings | ||
| warnings.simplefilter("ignore", np.VisibleDeprecationWarning) | ||
| warnings.simplefilter("ignore", np.exceptions.VisibleDeprecationWarning) | ||
|
|
||
|
|
||
| class SampleStorageHDF(SampleStorage): | ||
|
|
@@ -33,36 +33,14 @@ def __init__(self, file_path): | |
| for i_level in range(len(self._hdf_object.level_parameters)): | ||
| self._level_groups.append(self._hdf_object.add_level_group(str(i_level))) | ||
|
|
||
| def _hdf_result_format(self, locations, times): | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simplified, moved into hdf5.py as it is implementation detail how the format is stored.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moereover the format saving was wrong since it same format has been assumed for all dict elements. |
||
| """ | ||
| QuantitySpec data type, necessary for hdf storage | ||
| :return: | ||
| """ | ||
| if len(locations[0]) == 3: | ||
| tuple_dtype = np.dtype((np.float, (3,))) | ||
| loc_dtype = np.dtype((tuple_dtype, (len(locations),))) | ||
| else: | ||
| loc_dtype = np.dtype(('S50', (len(locations),))) | ||
|
|
||
| result_dtype = {'names': ('name', 'unit', 'shape', 'times', 'locations'), | ||
| 'formats': ('S50', | ||
| 'S50', | ||
| np.dtype((np.int32, (2,))), | ||
| np.dtype((np.float, (len(times),))), | ||
| loc_dtype | ||
| ) | ||
| } | ||
|
|
||
| return result_dtype | ||
|
|
||
| def save_global_data(self, level_parameters: List[np.float], result_format: List[QuantitySpec]): | ||
| def save_global_data(self, level_parameters: List[float], result_format: List[QuantitySpec]): | ||
| """ | ||
| Save hdf5 file global attributes | ||
| :param level_parameters: list of simulation steps | ||
| :param result_format: simulation result format | ||
| :return: None | ||
| """ | ||
| res_dtype = self._hdf_result_format(result_format[0].locations, result_format[0].times) | ||
|
|
||
| # Create file structure | ||
| self._hdf_object.create_file_structure(level_parameters) | ||
|
|
@@ -73,7 +51,7 @@ def save_global_data(self, level_parameters: List[np.float], result_format: List | |
| self._level_groups.append(self._hdf_object.add_level_group(str(i_level))) | ||
|
|
||
| # Save result format (QuantitySpec) | ||
| self.save_result_format(result_format, res_dtype) | ||
| self.save_result_format(result_format) | ||
|
|
||
| def load_scheduled_samples(self): | ||
| """ | ||
|
|
@@ -85,7 +63,7 @@ def load_scheduled_samples(self): | |
| scheduled[int(level.level_id)] = [sample[0].decode() for sample in level.scheduled()] | ||
| return scheduled | ||
|
|
||
| def save_result_format(self, result_format: List[QuantitySpec], res_dtype): | ||
| def save_result_format(self, result_format: List[QuantitySpec]): | ||
| """ | ||
| Save result format to hdf | ||
| :param result_format: List[QuantitySpec] | ||
|
|
@@ -96,20 +74,21 @@ def save_result_format(self, result_format: List[QuantitySpec], res_dtype): | |
| raise ValueError('You are setting a new different result format for an existing sample storage') | ||
| except AttributeError: | ||
| pass | ||
| self._hdf_object.save_result_format(result_format, res_dtype) | ||
| self._hdf_object.save_result_format(result_format) | ||
|
|
||
| @staticmethod | ||
| def make_qspec(name, unit, shape, times, locations): | ||
| return QuantitySpec(name.decode(), unit.decode(), shape, times, [loc.decode() for loc in locations]) | ||
|
|
||
| def load_result_format(self) -> List[QuantitySpec]: | ||
| """ | ||
| Load result format | ||
| """ | ||
| results_format = self._hdf_object.load_result_format() | ||
| quantities = [] | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same thing, but using constructing function with named parameters. |
||
| for res_format in results_format: | ||
| spec = QuantitySpec(res_format[0].decode(), res_format[1].decode(), res_format[2], res_format[3], | ||
| [loc.decode() for loc in res_format[4]]) | ||
|
|
||
| quantities.append(spec) | ||
|
|
||
| quantities = [ | ||
| self.make_qspec(*res_format[0]) | ||
| for ispec, res_format in sorted(results_format.items()) | ||
| ] | ||
| return quantities | ||
|
|
||
| def save_samples(self, successful, failed): | ||
|
|
@@ -125,7 +104,8 @@ def save_samples(self, successful, failed): | |
| def _save_succesful(self, successful_samples): | ||
| for level, samples in successful_samples.items(): | ||
| if len(samples) > 0: | ||
| self._level_groups[level].append_successful(np.array(samples)) | ||
| ids, sample_values = zip(*samples) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need to make pairs here and convert to numpy arrays just before storage. |
||
| self._level_groups[level].append_successful(np.array(ids, dtype=str), np.array(sample_values, dtype=float)) | ||
|
|
||
| def _save_failed(self, failed_samples): | ||
| for level, samples in failed_samples.items(): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -122,6 +122,7 @@ def calculate_sample(sample_id, level_sim, work_dir=None, seed=None): | |
| except Exception: | ||
| str_list = traceback.format_exception(*sys.exc_info()) | ||
| err_msg = "".join(str_list) | ||
| print(err_msg) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: replace by suitable logging? Only in case of debugging turned on. |
||
|
|
||
| return sample_id, res, err_msg, running_time | ||
|
|
||
|
|
@@ -135,6 +136,24 @@ def change_to_sample_directory(work_dir, path: str): | |
| sample_dir = os.path.join(work_dir, path) | ||
| if not os.path.isdir(sample_dir): | ||
| os.makedirs(sample_dir, mode=0o775, exist_ok=True) | ||
|
|
||
| # We have observed possible problems with directory creation on the | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guessed that there is some problem with NFS, however in fact there was probably problem with various threads writing to the same directory, randomly cleaning it after it was created. |
||
| # network filesystem. However it is not sure that was the cause of the problem. | ||
| # This code would make sure that we have write access to the created directory. | ||
|
|
||
| # Commented out in order to try to reproduce the problem. | ||
| #for i in range(30): | ||
| #try: | ||
| #with open(os.path.join(sample_dir, "_test_file.txt"), "w") as f: | ||
| #f.write("test") | ||
| #except: | ||
| #time.sleep(1) | ||
| #continue | ||
| #print("Workdir ready after {i} sleep seconds.") | ||
| #break | ||
| #else: | ||
| #print("Workdir still not ready ready after {i} sleep seconds.") | ||
|
|
||
| return sample_dir | ||
|
|
||
| @staticmethod | ||
|
|
@@ -241,7 +260,8 @@ def _process_result(self, sample_id, result, err_msg, running_time, level_sim): | |
| self._save_running_time(level_sim._level_id, running_time) | ||
|
|
||
| if not err_msg: | ||
| self._queues.setdefault(level_sim._level_id, queue.Queue()).put((sample_id, (result[0], result[1]))) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just to improve readibility |
||
| level_queue = self._queues.setdefault(level_sim._level_id, queue.Queue()) | ||
| level_queue.put((sample_id, (result[0], result[1]))) | ||
| if not self._debug: | ||
| SamplingPool.move_successful_rm(sample_id, level_sim, output_dir=self._output_dir, dest_dir=self._successful_dir) | ||
| else: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ | |
| from mlmc.level_simulation import LevelSimulation | ||
| from mlmc.sampling_pool import SamplingPool | ||
| from mlmc.tool.pbs_job import PbsJob | ||
| from mlmc.tool.pbs_commands import PbsCommands | ||
|
|
||
| """ | ||
| SamplingPoolPBS description | ||
|
|
@@ -90,6 +91,17 @@ def __init__(self, work_dir, debug=False): | |
| self._qsub_failed_n = 0 | ||
| self._qstat_failed_n = 0 | ||
| # Number of failed execution of commands qsub, qstat | ||
| self._pbs_commands = None | ||
|
|
||
| @property | ||
| def pbs_commands(self): | ||
| if self._pbs_commands is None: | ||
| self._pbs_commands = PbsCommands() | ||
| return self._pbs_commands | ||
|
|
||
| @pbs_commands.setter | ||
| def pbs_commands(self, pbs_commands_obj): | ||
| self._pbs_commands = pbs_commands_obj | ||
|
|
||
| def _get_job_count(self): | ||
| """ | ||
|
|
@@ -200,49 +212,49 @@ def execute(self): | |
| Execute pbs script | ||
| :return: None | ||
| """ | ||
| if len(self._scheduled) > 0: | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Decrease indention level |
||
| job_id = "{:04d}".format(self._job_count) | ||
| # Create pbs job | ||
| pbs_process = PbsJob.create_job(self._output_dir, self._jobs_dir, job_id, | ||
| SamplingPoolPBS.LEVEL_SIM_CONFIG, self._debug) | ||
|
|
||
| pbs_process.save_sample_id_job_id(job_id, self._scheduled) | ||
| # Write scheduled samples to file | ||
| pbs_process.save_scheduled(self._scheduled) | ||
|
|
||
| # Format pbs script | ||
| self._create_script() | ||
|
|
||
| if self.pbs_script is None or self._n_samples_in_job == 0: | ||
| return | ||
| if len(self._scheduled) <= 0: | ||
| return | ||
| job_id = "{:04d}".format(self._job_count) | ||
| # Create pbs job | ||
| pbs_process = PbsJob.create_job(self._output_dir, self._jobs_dir, job_id, | ||
| SamplingPoolPBS.LEVEL_SIM_CONFIG, self._debug) | ||
|
|
||
| pbs_process.save_sample_id_job_id(job_id, self._scheduled) | ||
| # Write scheduled samples to file | ||
| pbs_process.save_scheduled(self._scheduled) | ||
|
|
||
| # Format pbs script | ||
| self._create_script() | ||
|
|
||
| if self.pbs_script is None or self._n_samples_in_job == 0: | ||
| return | ||
|
|
||
| # Write pbs script | ||
| job_file = os.path.join(self._jobs_dir, SamplingPoolPBS.JOB.format(job_id)) | ||
| script_content = "\n".join(self.pbs_script) | ||
| self.write_script(script_content, job_file) | ||
|
|
||
| process = self.pbs_commands.qsub([job_file]) | ||
| if process.status != 0: | ||
| self._qsub_failed_n += 1 | ||
| print(f"\nWARNING: FAILED QSUB, {self._qsub_failed_n} consecutive\n: {process}") | ||
| if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N: | ||
| raise Exception(str(process)) | ||
| else: | ||
| # Find all finished jobs | ||
| self._qsub_failed_n = 0 | ||
| # Write current job count | ||
| self._job_count += 1 | ||
|
|
||
| # Write pbs script | ||
| job_file = os.path.join(self._jobs_dir, SamplingPoolPBS.JOB.format(job_id)) | ||
| script_content = "\n".join(self.pbs_script) | ||
| self.write_script(script_content, job_file) | ||
| # Get pbs_id from qsub output | ||
| pbs_id = process.stdout.split(".")[0] | ||
| # Store pbs id for future qstat calls | ||
| self._pbs_ids.append(pbs_id) | ||
| pbs_process.write_pbs_id(pbs_id) | ||
|
|
||
| process = subprocess.run(['qsub', job_file], stderr=subprocess.PIPE, stdout=subprocess.PIPE) | ||
| try: | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I quite didn't get reason for this try block. What is reason for PIPEs? I propose to just let subproces.run finish and take resulting stdout and stderr. |
||
| if process.returncode != 0: | ||
| raise Exception(process.stderr.decode('ascii')) | ||
| # Find all finished jobs | ||
| self._qsub_failed_n = 0 | ||
| # Write current job count | ||
| self._job_count += 1 | ||
|
|
||
| # Get pbs_id from qsub output | ||
| pbs_id = process.stdout.decode("ascii").split(".")[0] | ||
| # Store pbs id for future qstat calls | ||
| self._pbs_ids.append(pbs_id) | ||
| pbs_process.write_pbs_id(pbs_id) | ||
|
|
||
| self._current_job_weight = 0 | ||
| self._n_samples_in_job = 0 | ||
| self._scheduled = [] | ||
| except: | ||
| self._qsub_failed_n += 1 | ||
| if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N: | ||
| raise Exception(process.stderr.decode("ascii")) | ||
| self._current_job_weight = 0 | ||
| self._n_samples_in_job = 0 | ||
| self._scheduled = [] | ||
|
|
||
| def _create_script(self): | ||
| """ | ||
|
|
@@ -254,7 +266,7 @@ def _create_script(self): | |
| self._pbs_config['pbs_output_dir'] = self._jobs_dir | ||
| self._pbs_config['output_dir'] = self._output_dir | ||
| self._pbs_config['work_dir'] = self._work_dir | ||
|
|
||
| self.pbs_script = [line.format(**self._pbs_config) for line in self._pbs_header_template] | ||
|
|
||
| def write_script(self, content, job_file): | ||
|
|
@@ -286,22 +298,22 @@ def _qstat_pbs_job(self): | |
| if len(self._pbs_ids) > 0: | ||
| # Get PBS id's status, | ||
| # '-x' - displays status information for finished and moved jobs in addition to queued and running jobs. | ||
| qstat_call = ["qstat", "-x"] | ||
| qstat_call.extend(self._pbs_ids) | ||
| qstat_args = ["-x"] | ||
| qstat_args.extend(self._pbs_ids) | ||
|
|
||
| # qstat call | ||
| process = subprocess.run(qstat_call, stderr=subprocess.PIPE, stdout=subprocess.PIPE) | ||
| process = self.pbs_commands.qstat(qstat_args) | ||
| try: | ||
| if process.returncode != 0: | ||
| raise Exception(process.stderr.decode("ascii")) | ||
| output = process.stdout.decode("ascii") | ||
| if process.status != 0: | ||
| raise Exception(process.stderr) | ||
| output = process.stdout | ||
| # Find all finished jobs | ||
| finished_pbs_jobs = re.findall(r"(\d+)\..*\d+ F", output) | ||
| self._qstat_failed_n = 0 | ||
| except: | ||
| self._qstat_failed_n += 1 | ||
| if self._qstat_failed_n > SamplingPoolPBS.QSTAT_FAILED_MAX_N: | ||
| raise Exception(process.stderr.decode("ascii")) | ||
| raise Exception(process.stderr) | ||
| finished_pbs_jobs = [] | ||
|
|
||
| # Get unfinished as diff between planned and finished | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reason: float seems to be incompatible with new Numpy versions