Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 178 additions & 33 deletions src/common/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ class ResultReader(abc.ABC):
- (optional) get_trajectories(self, names) -> dict(name, Trajectory) for variables in <names>.
default: Loop over get_trajectory(name).
"""

def _get_name(self) -> list[str]:
warnings.warn(
"Getting variable names via the `name` attribute is deprecated and will be removed in a future version."
"Use the `get_variable_names` function instead.",
DeprecationWarning,
stacklevel=2
)
return self.get_variable_names()

name = property(fget = _get_name)

@abc.abstractmethod
def get_variable_names(self) -> list[str]:
Expand Down Expand Up @@ -214,17 +225,6 @@ class ResultDymola(ResultReader):
Base class for representation of a result file.
"""

def _get_name(self) -> list[str]:
warnings.warn(
"Getting variable names via the `name` attribute is deprecated and will be removed in a future version."
"Use the `get_variable_names` function instead.",
DeprecationWarning,
stacklevel=2
)
return self.get_variable_names()

name = property(fget = _get_name)

def get_variable_names(self) -> list[str]:
return [decode(n) for n in self.name_lookup.keys()]

Expand Down Expand Up @@ -1124,6 +1124,11 @@ def is_variable(self, name):
name --
Name of the variable/parameter/constant

Raises::

VariableNotFoundError --
If variable does not exists.

Returns::

True if the variable is time-varying.
Expand Down Expand Up @@ -1783,7 +1788,7 @@ def _get_calculated_diagnostics_cpu_time(self) -> np.ndarray:
"""Get trajectory values for cumulative CPU time."""
return DynamicDiagnosticsUtils.get_cpu_time(self.get_trajectory(f'{DIAGNOSTICS_PREFIX}cpu_time_per_step').x)

def is_variable(self, name):
def is_variable(self, name: str) -> bool:
"""
Returns True if the given name corresponds to a time-varying variable.

Expand All @@ -1792,6 +1797,11 @@ def is_variable(self, name):
name --
Name of the variable/parameter/constant.

Raises::

VariableNotFoundError --
If variable does not exists.

Returns::

True if the variable is time-varying.
Expand Down Expand Up @@ -3224,8 +3234,15 @@ def _data_1(self):
return scipy.io.loadmat(self._fname, chars_as_strings=False, variable_names=["data_1"])["data_1"]

def get_trajectory(self, name: str) -> Trajectory:
return self._get_trajectory(name)

def _get_trajectory(self, name: str, start_index: int = 0, stop_index: int | None = None) -> Trajectory:
time = self._diagnostics_time_vector if self._contains_diagnostic_data else self._time_vector

if start_index != 0 or stop_index is not None:
start_index, stop_index = _clamp_indicies(len(time), start_index, stop_index)
time = time[start_index: stop_index]

if name in ("time", "Time"):
return Trajectory(time, time)

Expand All @@ -3235,55 +3252,103 @@ def get_trajectory(self, name: str) -> Trajectory:
return Trajectory(self._data_1[0], self._data_1[data_index])
case 2:
data = (
self._get_interpolated_trajectory(data_index)
self._get_interpolated_trajectory(data_index, start_index, stop_index)
if self._contains_diagnostic_data
else self._get_trajectory(data_index)
else self._get_primary_trajectory(data_index, start_index, stop_index)
)
return Trajectory(time, data)
case 3:
return Trajectory(time, self._get_diagnostics_trajectory(data_index))
return Trajectory(time, self._get_diagnostics_trajectory(data_index, start_index, stop_index))
case _:
raise ValueError(f"Invalid data matrix: {data_mat}")

@cached_property
def _time_vector(self) -> np.ndarray:
return self._get_trajectory(0)
return self._get_primary_trajectory(0)

@cached_property
def _diagnostics_time_vector(self) -> np.ndarray:
return self._get_diagnostics_trajectory(0)

def _get_trajectory(self, data_index: int) -> np.ndarray:
return fmi_util.read_trajectory(
encode(self._fname),
data_index,
self._data_2_info["file_position"],
self._data_2_info["sizeof_type"],
int(self._data_2_info["nbr_points"]),
int(self._data_2_info["nbr_variables"])
)
def _get_primary_trajectory(self, data_index: int, start_index: int = 0, stop_index: int | None = None) -> np.ndarray:
return self._get_trajectory_from_data(self._data_2_info, data_index, start_index, stop_index)

def _get_diagnostics_trajectory(self, data_index: int, start_index: int = 0, stop_index: int | None = None) -> np.ndarray:
return self._get_trajectory_from_data(self._data_3_info, data_index, start_index, stop_index)

def _get_trajectory_from_data(self, data_info, data_index: int, start_index: int = 0, stop_index: int | None = None) -> np.ndarray:
file_position = data_info["file_position"]
sizeof_type = data_info["sizeof_type"]
nbr_points = data_info["nbr_points"]
nbr_variables = data_info["nbr_variables"]

start_index, stop_index = _clamp_indicies(nbr_points, start_index, stop_index)
new_file_position = file_position + start_index * sizeof_type * nbr_variables
new_nbr_points = stop_index - start_index

def _get_diagnostics_trajectory(self, data_index: int) -> np.ndarray:
return fmi_util.read_trajectory(
encode(self._fname),
data_index,
self._data_3_info["file_position"],
self._data_3_info["sizeof_type"],
int(self._data_3_info["nbr_points"]),
int(self._data_3_info["nbr_variables"])
new_file_position,
sizeof_type,
int(new_nbr_points),
int(nbr_variables)
)

def _get_interpolated_trajectory(self, data_index: int) -> np.ndarray:
def _get_interpolated_trajectory(self, data_index: int, start_index: int = 0, stop_index: int | None = None) -> np.ndarray:
time_vector = self._time_vector
diag_time_vector = self._diagnostics_time_vector
data = self._get_trajectory(data_index)
data = self._get_primary_trajectory(data_index)

if len(data) == 1:
return data

f = scipy.interpolate.interp1d(time_vector, data, fill_value="extrapolate")

start_index, stop_index = _clamp_indicies(self._data_3_info["nbr_points"], start_index, stop_index)
diag_time_vector = self._diagnostics_time_vector[start_index: stop_index]
return f(diag_time_vector)

def is_variable(self, name: str) -> bool:
if name == 'time' or name== 'Time':
return True

data_index, _ = self._get_data_index_mat(name)
return data_index != 1

def get_variables_data(
self,
names: list[str],
start_index: int = 0,
stop_index: int | None = None,
) -> tuple[dict[str, Trajectory], Union[int, None]]:
if isinstance(start_index, int) and isinstance(stop_index, int) and stop_index < start_index:
raise ValueError(f"Invalid values for {start_index=} and {stop_index=}, " + \
"'start_index' needs to be less than or equal to 'stop_index'.")

trajectories = {name: self._get_trajectory(name, start_index, stop_index) for name in names}
largest_trajectory_length = self._find_max_trajectory_length(trajectories)
new_start_index = (start_index + largest_trajectory_length) if trajectories else start_index
return trajectories, new_start_index

def _find_max_trajectory_length(self, trajectories: dict[str, Trajectory]) -> int:
"""
Given a dict of trajectories, find the length of the largest trajectory
among the set of continuous variables. We disregard parameters/constants since they are not stored
with the same amount of data points as trajectories for continuous variables.
"""
return max([0] + [len(t.x) for v, t in trajectories.items() if self.is_variable(v)])


def _clamp_indicies(nbr_points: int, start_index: int, stop_index: int | None):
# Accounts for sub-sets of data
start_index = max(0, start_index)
stop_index = max(0, nbr_points if stop_index is None else min(nbr_points, stop_index))

# Finally when stop_index = None, we can end up with start > stop,
# therefore we need to use min(start, stop)
start_index = min(start_index, stop_index)

return start_index, stop_index

class ResultReaderBinaryMat(ResultReader):
def __init__(self, fname, allow_file_updates=False):
Expand Down Expand Up @@ -3331,14 +3396,94 @@ def _get_delegate(self, fname, allow_file_updates: bool):
)

def get_variable_names(self) -> list[str]:
"""Retrieve the names of the variables stored in this class."""
return self._delegate.get_variable_names()

def get_trajectory(self, name: str) -> Trajectory:
"""Retrieve a single pyfmi.common.io.Trajectory by variables name.
Parameters::

name --
String of variable name.
Return::
pyfmi.common.io.Trajectory corresponding to variable_name.
"""
return self._delegate.get_trajectory(name)

def get_trajectories(self, names: list[str]) -> dict[str, Trajectory]:
"""Retrieve multiple trajectories, returns a dictionary of variables trajectories.
names --
List of Strings of variable names.
Return::
Dictionary: {variable_name: pyfmi.common.io.Trajectory}.
"""
return self._delegate.get_trajectories(names)

def is_variable(self, name: str) -> bool:
"""
Returns True if the given name corresponds to a time-varying variable.

Parameters::

name --
Name of the variable/parameter/constant.

Raises::

VariableNotFoundError --
If variable does not exists.

Returns::

True if the variable is time-varying.
"""
return self._delegate.is_variable(name)

def get_variables_data(
self,
names: list[str],
start_index: int = 0,
stop_index: int | None = None,
) -> tuple[dict[str, Trajectory], Union[int, None]]:
""""
Returns trajectories for each variable in 'names' with lengths adjusted for the
interval [start_index, stop_index], i.e. partial trajectories.
Improper values for start_index and stop_index that are out of bounds are automatically corrected,
such that:
Negative values are always adjusted to 0 or larger.
Out of bounds for stop_index is adjusted for the number of available data points, example:
If start_index = 0, stop_index = 5 and there are only 3 data points available,
then returned trajectories are of length 3.
If start_index is larger than or equal to the number of available data points, empty trajectories
are returned, i.e. trajectories of length 0.
Note that trajectories for parameters are always of length 2 if indices 0 and 1 are
part of the requested trajectory since they reflect the values of before and after initialization.
Therefore if you request a trajectory for a parameter with start_index>=2, returned trajectory is empty.

By default, start_index = 0 and stop_index = None, which implies that the full trajectory is returned.

Parameters::

names --
List of variables names for which to fetch trajectories.

start_index --
The index from where the trajectory data starts from.

stop_index --
The index from where the trajectory data ends. If stop_index is set to None,
it implies that all data in the slice [start_index:] is returned.

Raises::
ValueError -- If stop_index < start_index.

Returns::
Tuple: (dict of trajectories with keys corresponding to variable names, next start index (non-negative))
"""
return self._delegate.get_variables_data(
names, start_index=start_index, stop_index=stop_index
)


def verify_result_size(file_name, first_point, current_size, previous_size, max_size, ncp, time):
free_space = get_available_disk_space(file_name)
Expand Down
Loading