Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mlmc/sample_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _save_successful(self, samples):
res = np.array(res)
fine_coarse_res = res[:, 1]

result_type = np.dtype((np.float, np.array(fine_coarse_res[0]).shape))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason: float seems to be incompatible with new Numpy versions

result_type = np.dtype((float, np.array(fine_coarse_res[0]).shape))
results = np.empty(shape=(len(res),), dtype=result_type)
results[:] = [val for val in fine_coarse_res]

Expand Down
50 changes: 15 additions & 35 deletions mlmc/sample_storage_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from mlmc.quantity.quantity_spec import QuantitySpec, ChunkSpec
import mlmc.tool.hdf5 as hdf
import warnings
warnings.simplefilter("ignore", np.VisibleDeprecationWarning)
warnings.simplefilter("ignore", np.exceptions.VisibleDeprecationWarning)


class SampleStorageHDF(SampleStorage):
Expand Down Expand Up @@ -33,36 +33,14 @@ def __init__(self, file_path):
for i_level in range(len(self._hdf_object.level_parameters)):
self._level_groups.append(self._hdf_object.add_level_group(str(i_level)))

def _hdf_result_format(self, locations, times):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified, moved into hdf5.py as it is implementation detail how the format is stored.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moereover the format saving was wrong since it same format has been assumed for all dict elements.

"""
QuantitySpec data type, necessary for hdf storage
:return:
"""
if len(locations[0]) == 3:
tuple_dtype = np.dtype((np.float, (3,)))
loc_dtype = np.dtype((tuple_dtype, (len(locations),)))
else:
loc_dtype = np.dtype(('S50', (len(locations),)))

result_dtype = {'names': ('name', 'unit', 'shape', 'times', 'locations'),
'formats': ('S50',
'S50',
np.dtype((np.int32, (2,))),
np.dtype((np.float, (len(times),))),
loc_dtype
)
}

return result_dtype

def save_global_data(self, level_parameters: List[np.float], result_format: List[QuantitySpec]):
def save_global_data(self, level_parameters: List[float], result_format: List[QuantitySpec]):
"""
Save hdf5 file global attributes
:param level_parameters: list of simulation steps
:param result_format: simulation result format
:return: None
"""
res_dtype = self._hdf_result_format(result_format[0].locations, result_format[0].times)

# Create file structure
self._hdf_object.create_file_structure(level_parameters)
Expand All @@ -73,7 +51,7 @@ def save_global_data(self, level_parameters: List[np.float], result_format: List
self._level_groups.append(self._hdf_object.add_level_group(str(i_level)))

# Save result format (QuantitySpec)
self.save_result_format(result_format, res_dtype)
self.save_result_format(result_format)

def load_scheduled_samples(self):
"""
Expand All @@ -85,7 +63,7 @@ def load_scheduled_samples(self):
scheduled[int(level.level_id)] = [sample[0].decode() for sample in level.scheduled()]
return scheduled

def save_result_format(self, result_format: List[QuantitySpec], res_dtype):
def save_result_format(self, result_format: List[QuantitySpec]):
"""
Save result format to hdf
:param result_format: List[QuantitySpec]
Expand All @@ -96,20 +74,21 @@ def save_result_format(self, result_format: List[QuantitySpec], res_dtype):
raise ValueError('You are setting a new different result format for an existing sample storage')
except AttributeError:
pass
self._hdf_object.save_result_format(result_format, res_dtype)
self._hdf_object.save_result_format(result_format)

@staticmethod
def make_qspec(name, unit, shape, times, locations):
return QuantitySpec(name.decode(), unit.decode(), shape, times, [loc.decode() for loc in locations])

def load_result_format(self) -> List[QuantitySpec]:
"""
Load result format
"""
results_format = self._hdf_object.load_result_format()
quantities = []
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing, but using constructing function with named parameters.

for res_format in results_format:
spec = QuantitySpec(res_format[0].decode(), res_format[1].decode(), res_format[2], res_format[3],
[loc.decode() for loc in res_format[4]])

quantities.append(spec)

quantities = [
self.make_qspec(*res_format[0])
for ispec, res_format in sorted(results_format.items())
]
return quantities

def save_samples(self, successful, failed):
Expand All @@ -125,7 +104,8 @@ def save_samples(self, successful, failed):
def _save_succesful(self, successful_samples):
for level, samples in successful_samples.items():
if len(samples) > 0:
self._level_groups[level].append_successful(np.array(samples))
ids, sample_values = zip(*samples)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to make pairs here and convert to numpy arrays just before storage.

self._level_groups[level].append_successful(np.array(ids, dtype=str), np.array(sample_values, dtype=float))

def _save_failed(self, failed_samples):
for level, samples in failed_samples.items():
Expand Down
22 changes: 21 additions & 1 deletion mlmc/sampling_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def calculate_sample(sample_id, level_sim, work_dir=None, seed=None):
except Exception:
str_list = traceback.format_exception(*sys.exc_info())
err_msg = "".join(str_list)
print(err_msg)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: replace by suitable logging? Only in case of debugging turned on.
Problem is that the exceptions may not be propagated to the Sampler if pool somehow fail. So it would be great to log the exception within the sampling job process.


return sample_id, res, err_msg, running_time

Expand All @@ -135,6 +136,24 @@ def change_to_sample_directory(work_dir, path: str):
sample_dir = os.path.join(work_dir, path)
if not os.path.isdir(sample_dir):
os.makedirs(sample_dir, mode=0o775, exist_ok=True)

# We have observed possible problems with directory creation on the
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guessed that there is some problem with NFS, however in fact there was probably problem with various threads writing to the same directory, randomly cleaning it after it was created.
I suggest to keep the code commented, it allows to better check that we are able to write into the directory. There could be problem with rights anyway.

# network filesystem. However it is not sure that was the cause of the problem.
# This code would make sure that we have write access to the created directory.

# Commented out in order to try to reproduce the problem.
#for i in range(30):
#try:
#with open(os.path.join(sample_dir, "_test_file.txt"), "w") as f:
#f.write("test")
#except:
#time.sleep(1)
#continue
#print("Workdir ready after {i} sleep seconds.")
#break
#else:
#print("Workdir still not ready ready after {i} sleep seconds.")

return sample_dir

@staticmethod
Expand Down Expand Up @@ -241,7 +260,8 @@ def _process_result(self, sample_id, result, err_msg, running_time, level_sim):
self._save_running_time(level_sim._level_id, running_time)

if not err_msg:
self._queues.setdefault(level_sim._level_id, queue.Queue()).put((sample_id, (result[0], result[1])))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to improve readibility

level_queue = self._queues.setdefault(level_sim._level_id, queue.Queue())
level_queue.put((sample_id, (result[0], result[1])))
if not self._debug:
SamplingPool.move_successful_rm(sample_id, level_sim, output_dir=self._output_dir, dest_dir=self._successful_dir)
else:
Expand Down
110 changes: 61 additions & 49 deletions mlmc/sampling_pool_pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from mlmc.level_simulation import LevelSimulation
from mlmc.sampling_pool import SamplingPool
from mlmc.tool.pbs_job import PbsJob
from mlmc.tool.pbs_commands import PbsCommands

"""
SamplingPoolPBS description
Expand Down Expand Up @@ -90,6 +91,17 @@ def __init__(self, work_dir, debug=False):
self._qsub_failed_n = 0
self._qstat_failed_n = 0
# Number of failed execution of commands qsub, qstat
self._pbs_commands = None

@property
def pbs_commands(self):
if self._pbs_commands is None:
self._pbs_commands = PbsCommands()
return self._pbs_commands

@pbs_commands.setter
def pbs_commands(self, pbs_commands_obj):
self._pbs_commands = pbs_commands_obj

def _get_job_count(self):
"""
Expand Down Expand Up @@ -200,49 +212,49 @@ def execute(self):
Execute pbs script
:return: None
"""
if len(self._scheduled) > 0:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decrease indention level

job_id = "{:04d}".format(self._job_count)
# Create pbs job
pbs_process = PbsJob.create_job(self._output_dir, self._jobs_dir, job_id,
SamplingPoolPBS.LEVEL_SIM_CONFIG, self._debug)

pbs_process.save_sample_id_job_id(job_id, self._scheduled)
# Write scheduled samples to file
pbs_process.save_scheduled(self._scheduled)

# Format pbs script
self._create_script()

if self.pbs_script is None or self._n_samples_in_job == 0:
return
if len(self._scheduled) <= 0:
return
job_id = "{:04d}".format(self._job_count)
# Create pbs job
pbs_process = PbsJob.create_job(self._output_dir, self._jobs_dir, job_id,
SamplingPoolPBS.LEVEL_SIM_CONFIG, self._debug)

pbs_process.save_sample_id_job_id(job_id, self._scheduled)
# Write scheduled samples to file
pbs_process.save_scheduled(self._scheduled)

# Format pbs script
self._create_script()

if self.pbs_script is None or self._n_samples_in_job == 0:
return

# Write pbs script
job_file = os.path.join(self._jobs_dir, SamplingPoolPBS.JOB.format(job_id))
script_content = "\n".join(self.pbs_script)
self.write_script(script_content, job_file)

process = self.pbs_commands.qsub([job_file])
if process.status != 0:
self._qsub_failed_n += 1
print(f"\nWARNING: FAILED QSUB, {self._qsub_failed_n} consecutive\n: {process}")
if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N:
raise Exception(str(process))
else:
# Find all finished jobs
self._qsub_failed_n = 0
# Write current job count
self._job_count += 1

# Write pbs script
job_file = os.path.join(self._jobs_dir, SamplingPoolPBS.JOB.format(job_id))
script_content = "\n".join(self.pbs_script)
self.write_script(script_content, job_file)
# Get pbs_id from qsub output
pbs_id = process.stdout.split(".")[0]
# Store pbs id for future qstat calls
self._pbs_ids.append(pbs_id)
pbs_process.write_pbs_id(pbs_id)

process = subprocess.run(['qsub', job_file], stderr=subprocess.PIPE, stdout=subprocess.PIPE)
try:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I quite didn't get reason for this try block. What is reason for PIPEs? I propose to just let subproces.run finish and take resulting stdout and stderr.

if process.returncode != 0:
raise Exception(process.stderr.decode('ascii'))
# Find all finished jobs
self._qsub_failed_n = 0
# Write current job count
self._job_count += 1

# Get pbs_id from qsub output
pbs_id = process.stdout.decode("ascii").split(".")[0]
# Store pbs id for future qstat calls
self._pbs_ids.append(pbs_id)
pbs_process.write_pbs_id(pbs_id)

self._current_job_weight = 0
self._n_samples_in_job = 0
self._scheduled = []
except:
self._qsub_failed_n += 1
if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N:
raise Exception(process.stderr.decode("ascii"))
self._current_job_weight = 0
self._n_samples_in_job = 0
self._scheduled = []

def _create_script(self):
"""
Expand All @@ -254,7 +266,7 @@ def _create_script(self):
self._pbs_config['pbs_output_dir'] = self._jobs_dir
self._pbs_config['output_dir'] = self._output_dir
self._pbs_config['work_dir'] = self._work_dir

self.pbs_script = [line.format(**self._pbs_config) for line in self._pbs_header_template]

def write_script(self, content, job_file):
Expand Down Expand Up @@ -286,22 +298,22 @@ def _qstat_pbs_job(self):
if len(self._pbs_ids) > 0:
# Get PBS id's status,
# '-x' - displays status information for finished and moved jobs in addition to queued and running jobs.
qstat_call = ["qstat", "-x"]
qstat_call.extend(self._pbs_ids)
qstat_args = ["-x"]
qstat_args.extend(self._pbs_ids)

# qstat call
process = subprocess.run(qstat_call, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
process = self.pbs_commands.qstat(qstat_args)
try:
if process.returncode != 0:
raise Exception(process.stderr.decode("ascii"))
output = process.stdout.decode("ascii")
if process.status != 0:
raise Exception(process.stderr)
output = process.stdout
# Find all finished jobs
finished_pbs_jobs = re.findall(r"(\d+)\..*\d+ F", output)
self._qstat_failed_n = 0
except:
self._qstat_failed_n += 1
if self._qstat_failed_n > SamplingPoolPBS.QSTAT_FAILED_MAX_N:
raise Exception(process.stderr.decode("ascii"))
raise Exception(process.stderr)
finished_pbs_jobs = []

# Get unfinished as diff between planned and finished
Expand Down
Loading
Loading