From 353c5d6f9c26198dec1457c1d3a4fb14a50fc5c7 Mon Sep 17 00:00:00 2001 From: coder Date: Sun, 8 Feb 2026 08:32:36 -0800 Subject: [PATCH] add fmi3 --- src/fmuloader/fmi3.py | 2324 +++++++++++++++++++++++++++++++++++++++++ tests/test_fmi3.py | 1063 +++++++++++++++++++ 2 files changed, 3387 insertions(+) create mode 100644 src/fmuloader/fmi3.py create mode 100644 tests/test_fmi3.py diff --git a/src/fmuloader/fmi3.py b/src/fmuloader/fmi3.py new file mode 100644 index 0000000..cd18e66 --- /dev/null +++ b/src/fmuloader/fmi3.py @@ -0,0 +1,2324 @@ +""" +Python ctypes bindings for the FMI 3.0 standard. + +This module provides complete Python bindings for loading and interacting with +FMI 3.0 Functional Mock-up Units (FMUs), supporting Co-Simulation, Model +Exchange, and Scheduled Execution interfaces. + +Reference: FMI Specification 3.0.2 +""" + +from __future__ import annotations + +import ctypes +import platform +import struct +import tempfile +import zipfile +from ctypes import ( + CDLL, + CFUNCTYPE, + POINTER, + byref, + c_bool, + c_char, + c_char_p, + c_double, + c_float, + c_int, + c_int8, + c_int16, + c_int32, + c_int64, + c_size_t, + c_uint8, + c_uint16, + c_uint32, + c_uint64, + c_void_p, +) +from enum import IntEnum +from pathlib import Path +from typing import Any, Sequence + +# --------------------------------------------------------------------------- +# FMI 3.0 primitive types (mirrors fmi3PlatformTypes.h) +# --------------------------------------------------------------------------- +fmi3Instance = c_void_p +fmi3InstanceEnvironment = c_void_p +fmi3FMUState = c_void_p +fmi3ValueReference = c_uint32 + +fmi3Float32 = c_float +fmi3Float64 = c_double +fmi3Int8 = c_int8 +fmi3UInt8 = c_uint8 +fmi3Int16 = c_int16 +fmi3UInt16 = c_uint16 +fmi3Int32 = c_int32 +fmi3UInt32 = c_uint32 +fmi3Int64 = c_int64 +fmi3UInt64 = c_uint64 +fmi3Boolean = c_bool +fmi3Char = c_char +fmi3String = c_char_p +fmi3Byte = c_uint8 +fmi3Binary = POINTER(c_uint8) +fmi3Clock = c_bool + +fmi3True: bool = True +fmi3False: bool = False +fmi3ClockActive: bool = True +fmi3ClockInactive: bool = False + + +# --------------------------------------------------------------------------- +# Enumerations +# --------------------------------------------------------------------------- +class Fmi3Status(IntEnum): + OK = 0 + WARNING = 1 + DISCARD = 2 + ERROR = 3 + FATAL = 4 + + +class Fmi3Type(IntEnum): + MODEL_EXCHANGE = 0 + CO_SIMULATION = 1 + SCHEDULED_EXECUTION = 2 + + +class Fmi3DependencyKind(IntEnum): + INDEPENDENT = 0 + CONSTANT = 1 + FIXED = 2 + TUNABLE = 3 + DISCRETE = 4 + DEPENDENT = 5 + + +class Fmi3IntervalQualifier(IntEnum): + INTERVAL_NOT_YET_KNOWN = 0 + INTERVAL_UNCHANGED = 1 + INTERVAL_CHANGED = 2 + + +# --------------------------------------------------------------------------- +# Callback function types +# --------------------------------------------------------------------------- +# void fmi3LogMessageCallback(fmi3InstanceEnvironment, fmi3Status, fmi3String, +# fmi3String) +_fmi3LogMessageCallback = CFUNCTYPE( + None, + fmi3InstanceEnvironment, + c_int, # fmi3Status + fmi3String, + fmi3String, +) + +# void fmi3ClockUpdateCallback(fmi3InstanceEnvironment) +_fmi3ClockUpdateCallback = CFUNCTYPE(None, fmi3InstanceEnvironment) + +# void fmi3IntermediateUpdateCallback( +# fmi3InstanceEnvironment, fmi3Float64, fmi3Boolean, fmi3Boolean, +# fmi3Boolean, fmi3Boolean, fmi3Boolean*, fmi3Float64*) +_fmi3IntermediateUpdateCallback = CFUNCTYPE( + None, + fmi3InstanceEnvironment, + fmi3Float64, + fmi3Boolean, + fmi3Boolean, + fmi3Boolean, + fmi3Boolean, + POINTER(fmi3Boolean), + POINTER(fmi3Float64), +) + +# void fmi3LockPreemptionCallback(void) +_fmi3LockPreemptionCallback = CFUNCTYPE(None) + +# void fmi3UnlockPreemptionCallback(void) +_fmi3UnlockPreemptionCallback = CFUNCTYPE(None) + + +# --------------------------------------------------------------------------- +# Default callbacks +# --------------------------------------------------------------------------- +def _default_logger( + _env: object, + status: int, + category: bytes | None, + message: bytes | None, +) -> None: + cat = category.decode() if category else "" + msg = message.decode() if message else "" + status_str = Fmi3Status(status).name + print(f"[{status_str}] [{cat}] {msg}") + + +_LOGGER_FUNC = _fmi3LogMessageCallback(_default_logger) + + +# --------------------------------------------------------------------------- +# Shared library helpers +# --------------------------------------------------------------------------- +def _shared_lib_extension() -> str: + s = platform.system() + if s == "Windows": + return ".dll" + if s == "Darwin": + return ".dylib" + return ".so" + + +def _platform_folder() -> str: + """Return the FMI 3.0 platform tuple, e.g. ``'x86_64-darwin'``. + + The FMI 3.0 standard defines platform tuples of the form + ``-``. Common examples: + + * ``x86_64-darwin``, ``aarch64-darwin`` + * ``x86_64-linux``, ``x86-linux``, ``aarch64-linux`` + * ``x86_64-windows``, ``x86-windows`` + """ + machine = platform.machine().lower() + s = platform.system() + + # Map Python machine names → FMI 3.0 architecture names + arch_map: dict[str, str] = { + "x86_64": "x86_64", + "amd64": "x86_64", + "i386": "x86", + "i686": "x86", + "x86": "x86", + "aarch64": "aarch64", + "arm64": "aarch64", + "armv7l": "aarch32", + "armv6l": "aarch32", + } + arch = arch_map.get(machine) + if arch is None: + # Fallback: use pointer size to guess x86 vs x86_64 + if struct.calcsize("P") * 8 == 64: + arch = "x86_64" + else: + arch = "x86" + + if s == "Darwin": + return f"{arch}-darwin" + if s == "Linux": + return f"{arch}-linux" + if s == "Windows": + return f"{arch}-windows" + raise RuntimeError(f"Unsupported platform: {s} ({machine})") + + +def _find_binary( + binaries_dir: Path, + model_identifier: str, + binary_dir: str | None = None, +) -> Path: + """Locate the shared library in the binaries/ directory. + + Args: + binaries_dir: The ``binaries/`` directory inside an extracted FMU. + model_identifier: The model identifier (shared lib name without + extension). + binary_dir: Optional override for the platform subfolder name. + When *None*, the standard FMI 3.0 platform tuple is used. + """ + ext = _shared_lib_extension() + lib_name = model_identifier + ext + + # Try the user-provided or standard folder + folder = binary_dir if binary_dir is not None else _platform_folder() + candidate = binaries_dir / folder / lib_name + if candidate.exists(): + return candidate + + # Fallback: scan all sub-directories + for found in binaries_dir.rglob(lib_name): + return found + + raise FileNotFoundError( + f"Cannot find shared library {lib_name!r} under {binaries_dir}" + ) + + +# --------------------------------------------------------------------------- +# FMI 3.0 error checking +# --------------------------------------------------------------------------- +class Fmi3Error(Exception): + """Raised when an FMI 3.0 function returns an error status.""" + + def __init__(self, func_name: str, status: Fmi3Status) -> None: + self.func_name = func_name + self.status = status + super().__init__(f"{func_name} returned {status.name} ({status.value})") + + +def _check_status(func_name: str, status: int) -> Fmi3Status: + s = Fmi3Status(status) + if s in (Fmi3Status.ERROR, Fmi3Status.FATAL): + raise Fmi3Error(func_name, s) + return s + + +# --------------------------------------------------------------------------- +# FMI3 Instance wrapper +# --------------------------------------------------------------------------- +class Fmi3Slave: + """Low-level wrapper around an FMI 3.0 shared library instance. + + This class binds all FMI 3.0 C functions via ctypes and provides + thin Python methods that handle type conversions automatically. + + Args: + path: Path to an ``.fmu`` archive **or** an already-extracted FMU + directory that contains ``binaries/``. + model_identifier: The model identifier -- i.e. the shared-library + file name without extension (e.g. ``"BouncingBall"``). + binary_dir: Override for the platform subfolder inside + ``binaries/``. The FMI 3.0 standard uses platform tuples like + ``x86_64-darwin``, ``aarch64-linux``, ``x86_64-windows``. + When *None*, the standard tuple for the current platform is + tried first, then all subdirectories are scanned as fallback. + unpack_dir: Where to extract the ``.fmu`` archive. If *None* a + temporary directory is used (cleaned up on context-manager + exit or garbage collection). + + Typical Co-Simulation usage:: + + slave = Fmi3Slave("BouncingBall.fmu", + model_identifier="BouncingBall") + + slave.instantiate_co_simulation( + "inst1", + instantiation_token="{...}", + ) + slave.enter_initialization_mode(start_time=0.0, stop_time=10.0) + slave.exit_initialization_mode() + + for t in ...: + slave.do_step(t, step_size) + values = slave.get_float64([vr1, vr2]) + + slave.terminate() + slave.free_instance() + + Typical Model Exchange usage:: + + slave = Fmi3Slave("BouncingBall.fmu", + model_identifier="BouncingBall") + + slave.instantiate_model_exchange( + "inst1", + instantiation_token="{...}", + ) + slave.enter_initialization_mode(start_time=0.0) + slave.exit_initialization_mode() + + # Initial event iteration + while True: + result = slave.update_discrete_states() + if not result.discrete_states_need_update: + break + + slave.enter_continuous_time_mode() + + # Integration loop + slave.set_time(t) + derivs = slave.get_continuous_state_derivatives(nx) + ... + + slave.terminate() + slave.free_instance() + """ + + def __init__( + self, + path: str | Path, + *, + model_identifier: str, + binary_dir: str | None = None, + unpack_dir: str | Path | None = None, + ) -> None: + self._path = Path(path) + self._tmpdir: tempfile.TemporaryDirectory[str] | None = None + self._instance: c_void_p | None = None + self._dll: CDLL | None = None + + # Determine whether we got an .fmu archive or an extracted directory + if self._path.suffix == ".fmu": + if unpack_dir is not None: + self._extract_dir = Path(unpack_dir) + self._extract_dir.mkdir(parents=True, exist_ok=True) + else: + self._tmpdir = tempfile.TemporaryDirectory(prefix="fmuloader_") + self._extract_dir = Path(self._tmpdir.name) + with zipfile.ZipFile(self._path) as zf: + zf.extractall(self._extract_dir) + else: + # Assume it's already an extracted directory + self._extract_dir = self._path + + self._model_identifier = model_identifier + + # Load shared library + binaries_dir = self._extract_dir / "binaries" + lib_path = _find_binary(binaries_dir, model_identifier, binary_dir) + self._dll = CDLL(str(lib_path)) + + # Bind all FMI 3.0 functions + self._bind_functions() + + # ------------------------------------------------------------------ + # Function binding + # ------------------------------------------------------------------ + def _bind_functions(self) -> None: + """Bind all FMI 3.0 C functions from the shared library.""" + dll = self._dll + assert dll is not None + + # ---- Common functions ---- + self._fmi3GetVersion = dll.fmi3GetVersion + self._fmi3GetVersion.restype = c_char_p + self._fmi3GetVersion.argtypes = [] + + self._fmi3SetDebugLogging = dll.fmi3SetDebugLogging + self._fmi3SetDebugLogging.restype = c_int + self._fmi3SetDebugLogging.argtypes = [ + fmi3Instance, + fmi3Boolean, + c_size_t, + POINTER(fmi3String), + ] + + # ---- Instantiation functions ---- + self._fmi3InstantiateModelExchange = dll.fmi3InstantiateModelExchange + self._fmi3InstantiateModelExchange.restype = fmi3Instance + self._fmi3InstantiateModelExchange.argtypes = [ + fmi3String, # instanceName + fmi3String, # instantiationToken + fmi3String, # resourcePath + fmi3Boolean, # visible + fmi3Boolean, # loggingOn + fmi3InstanceEnvironment, # instanceEnvironment + _fmi3LogMessageCallback, # logMessage + ] + + self._fmi3InstantiateCoSimulation = dll.fmi3InstantiateCoSimulation + self._fmi3InstantiateCoSimulation.restype = fmi3Instance + self._fmi3InstantiateCoSimulation.argtypes = [ + fmi3String, # instanceName + fmi3String, # instantiationToken + fmi3String, # resourcePath + fmi3Boolean, # visible + fmi3Boolean, # loggingOn + fmi3Boolean, # eventModeUsed + fmi3Boolean, # earlyReturnAllowed + POINTER(fmi3ValueReference), # requiredIntermediateVariables + c_size_t, # nRequiredIntermediateVariables + fmi3InstanceEnvironment, # instanceEnvironment + _fmi3LogMessageCallback, # logMessage + _fmi3IntermediateUpdateCallback, # intermediateUpdate + ] + + self._fmi3InstantiateScheduledExecution = dll.fmi3InstantiateScheduledExecution + self._fmi3InstantiateScheduledExecution.restype = fmi3Instance + self._fmi3InstantiateScheduledExecution.argtypes = [ + fmi3String, # instanceName + fmi3String, # instantiationToken + fmi3String, # resourcePath + fmi3Boolean, # visible + fmi3Boolean, # loggingOn + fmi3InstanceEnvironment, # instanceEnvironment + _fmi3LogMessageCallback, # logMessage + _fmi3ClockUpdateCallback, # clockUpdate + _fmi3LockPreemptionCallback, # lockPreemption + _fmi3UnlockPreemptionCallback, # unlockPreemption + ] + + self._fmi3FreeInstance = dll.fmi3FreeInstance + self._fmi3FreeInstance.restype = None + self._fmi3FreeInstance.argtypes = [fmi3Instance] + + # ---- Initialization / lifecycle ---- + self._fmi3EnterInitializationMode = dll.fmi3EnterInitializationMode + self._fmi3EnterInitializationMode.restype = c_int + self._fmi3EnterInitializationMode.argtypes = [ + fmi3Instance, + fmi3Boolean, # toleranceDefined + fmi3Float64, # tolerance + fmi3Float64, # startTime + fmi3Boolean, # stopTimeDefined + fmi3Float64, # stopTime + ] + + self._fmi3ExitInitializationMode = dll.fmi3ExitInitializationMode + self._fmi3ExitInitializationMode.restype = c_int + self._fmi3ExitInitializationMode.argtypes = [fmi3Instance] + + self._fmi3EnterEventMode = dll.fmi3EnterEventMode + self._fmi3EnterEventMode.restype = c_int + self._fmi3EnterEventMode.argtypes = [fmi3Instance] + + self._fmi3Terminate = dll.fmi3Terminate + self._fmi3Terminate.restype = c_int + self._fmi3Terminate.argtypes = [fmi3Instance] + + self._fmi3Reset = dll.fmi3Reset + self._fmi3Reset.restype = c_int + self._fmi3Reset.argtypes = [fmi3Instance] + + # ---- Getters ---- + # fmi3GetFloat32 + self._fmi3GetFloat32 = dll.fmi3GetFloat32 + self._fmi3GetFloat32.restype = c_int + self._fmi3GetFloat32.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Float32), + c_size_t, + ] + + # fmi3GetFloat64 + self._fmi3GetFloat64 = dll.fmi3GetFloat64 + self._fmi3GetFloat64.restype = c_int + self._fmi3GetFloat64.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Float64), + c_size_t, + ] + + # fmi3GetInt8 + self._fmi3GetInt8 = dll.fmi3GetInt8 + self._fmi3GetInt8.restype = c_int + self._fmi3GetInt8.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Int8), + c_size_t, + ] + + # fmi3GetUInt8 + self._fmi3GetUInt8 = dll.fmi3GetUInt8 + self._fmi3GetUInt8.restype = c_int + self._fmi3GetUInt8.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3UInt8), + c_size_t, + ] + + # fmi3GetInt16 + self._fmi3GetInt16 = dll.fmi3GetInt16 + self._fmi3GetInt16.restype = c_int + self._fmi3GetInt16.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Int16), + c_size_t, + ] + + # fmi3GetUInt16 + self._fmi3GetUInt16 = dll.fmi3GetUInt16 + self._fmi3GetUInt16.restype = c_int + self._fmi3GetUInt16.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3UInt16), + c_size_t, + ] + + # fmi3GetInt32 + self._fmi3GetInt32 = dll.fmi3GetInt32 + self._fmi3GetInt32.restype = c_int + self._fmi3GetInt32.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Int32), + c_size_t, + ] + + # fmi3GetUInt32 + self._fmi3GetUInt32 = dll.fmi3GetUInt32 + self._fmi3GetUInt32.restype = c_int + self._fmi3GetUInt32.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3UInt32), + c_size_t, + ] + + # fmi3GetInt64 + self._fmi3GetInt64 = dll.fmi3GetInt64 + self._fmi3GetInt64.restype = c_int + self._fmi3GetInt64.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Int64), + c_size_t, + ] + + # fmi3GetUInt64 + self._fmi3GetUInt64 = dll.fmi3GetUInt64 + self._fmi3GetUInt64.restype = c_int + self._fmi3GetUInt64.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3UInt64), + c_size_t, + ] + + # fmi3GetBoolean + self._fmi3GetBoolean = dll.fmi3GetBoolean + self._fmi3GetBoolean.restype = c_int + self._fmi3GetBoolean.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Boolean), + c_size_t, + ] + + # fmi3GetString + self._fmi3GetString = dll.fmi3GetString + self._fmi3GetString.restype = c_int + self._fmi3GetString.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3String), + c_size_t, + ] + + # fmi3GetBinary + self._fmi3GetBinary = dll.fmi3GetBinary + self._fmi3GetBinary.restype = c_int + self._fmi3GetBinary.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(c_size_t), # valueSizes + POINTER(fmi3Binary), # values + c_size_t, + ] + + # fmi3GetClock + self._fmi3GetClock = dll.fmi3GetClock + self._fmi3GetClock.restype = c_int + self._fmi3GetClock.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Clock), + ] + + # ---- Setters ---- + # fmi3SetFloat32 + self._fmi3SetFloat32 = dll.fmi3SetFloat32 + self._fmi3SetFloat32.restype = c_int + self._fmi3SetFloat32.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Float32), + c_size_t, + ] + + # fmi3SetFloat64 + self._fmi3SetFloat64 = dll.fmi3SetFloat64 + self._fmi3SetFloat64.restype = c_int + self._fmi3SetFloat64.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Float64), + c_size_t, + ] + + # fmi3SetInt8 + self._fmi3SetInt8 = dll.fmi3SetInt8 + self._fmi3SetInt8.restype = c_int + self._fmi3SetInt8.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Int8), + c_size_t, + ] + + # fmi3SetUInt8 + self._fmi3SetUInt8 = dll.fmi3SetUInt8 + self._fmi3SetUInt8.restype = c_int + self._fmi3SetUInt8.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3UInt8), + c_size_t, + ] + + # fmi3SetInt16 + self._fmi3SetInt16 = dll.fmi3SetInt16 + self._fmi3SetInt16.restype = c_int + self._fmi3SetInt16.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Int16), + c_size_t, + ] + + # fmi3SetUInt16 + self._fmi3SetUInt16 = dll.fmi3SetUInt16 + self._fmi3SetUInt16.restype = c_int + self._fmi3SetUInt16.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3UInt16), + c_size_t, + ] + + # fmi3SetInt32 + self._fmi3SetInt32 = dll.fmi3SetInt32 + self._fmi3SetInt32.restype = c_int + self._fmi3SetInt32.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Int32), + c_size_t, + ] + + # fmi3SetUInt32 + self._fmi3SetUInt32 = dll.fmi3SetUInt32 + self._fmi3SetUInt32.restype = c_int + self._fmi3SetUInt32.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3UInt32), + c_size_t, + ] + + # fmi3SetInt64 + self._fmi3SetInt64 = dll.fmi3SetInt64 + self._fmi3SetInt64.restype = c_int + self._fmi3SetInt64.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Int64), + c_size_t, + ] + + # fmi3SetUInt64 + self._fmi3SetUInt64 = dll.fmi3SetUInt64 + self._fmi3SetUInt64.restype = c_int + self._fmi3SetUInt64.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3UInt64), + c_size_t, + ] + + # fmi3SetBoolean + self._fmi3SetBoolean = dll.fmi3SetBoolean + self._fmi3SetBoolean.restype = c_int + self._fmi3SetBoolean.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Boolean), + c_size_t, + ] + + # fmi3SetString + self._fmi3SetString = dll.fmi3SetString + self._fmi3SetString.restype = c_int + self._fmi3SetString.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3String), + c_size_t, + ] + + # fmi3SetBinary + self._fmi3SetBinary = dll.fmi3SetBinary + self._fmi3SetBinary.restype = c_int + self._fmi3SetBinary.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(c_size_t), # valueSizes + POINTER(fmi3Binary), # values + c_size_t, + ] + + # fmi3SetClock + self._fmi3SetClock = dll.fmi3SetClock + self._fmi3SetClock.restype = c_int + self._fmi3SetClock.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Clock), + ] + + # ---- Variable dependencies ---- + self._fmi3GetNumberOfVariableDependencies = ( + dll.fmi3GetNumberOfVariableDependencies + ) + self._fmi3GetNumberOfVariableDependencies.restype = c_int + self._fmi3GetNumberOfVariableDependencies.argtypes = [ + fmi3Instance, + fmi3ValueReference, + POINTER(c_size_t), + ] + + self._fmi3GetVariableDependencies = dll.fmi3GetVariableDependencies + self._fmi3GetVariableDependencies.restype = c_int + self._fmi3GetVariableDependencies.argtypes = [ + fmi3Instance, + fmi3ValueReference, + POINTER(c_size_t), # elementIndicesOfDependent + POINTER(fmi3ValueReference), # independents + POINTER(c_size_t), # elementIndicesOfIndependents + POINTER(c_int), # dependencyKinds + c_size_t, + ] + + # ---- FMU state ---- + self._fmi3GetFMUState = dll.fmi3GetFMUState + self._fmi3GetFMUState.restype = c_int + self._fmi3GetFMUState.argtypes = [ + fmi3Instance, + POINTER(fmi3FMUState), + ] + + self._fmi3SetFMUState = dll.fmi3SetFMUState + self._fmi3SetFMUState.restype = c_int + self._fmi3SetFMUState.argtypes = [fmi3Instance, fmi3FMUState] + + self._fmi3FreeFMUState = dll.fmi3FreeFMUState + self._fmi3FreeFMUState.restype = c_int + self._fmi3FreeFMUState.argtypes = [ + fmi3Instance, + POINTER(fmi3FMUState), + ] + + self._fmi3SerializedFMUStateSize = dll.fmi3SerializedFMUStateSize + self._fmi3SerializedFMUStateSize.restype = c_int + self._fmi3SerializedFMUStateSize.argtypes = [ + fmi3Instance, + fmi3FMUState, + POINTER(c_size_t), + ] + + self._fmi3SerializeFMUState = dll.fmi3SerializeFMUState + self._fmi3SerializeFMUState.restype = c_int + self._fmi3SerializeFMUState.argtypes = [ + fmi3Instance, + fmi3FMUState, + POINTER(fmi3Byte), + c_size_t, + ] + + self._fmi3DeserializeFMUState = dll.fmi3DeserializeFMUState + self._fmi3DeserializeFMUState.restype = c_int + self._fmi3DeserializeFMUState.argtypes = [ + fmi3Instance, + POINTER(fmi3Byte), + c_size_t, + POINTER(fmi3FMUState), + ] + + # ---- Directional / adjoint derivatives ---- + self._fmi3GetDirectionalDerivative = dll.fmi3GetDirectionalDerivative + self._fmi3GetDirectionalDerivative.restype = c_int + self._fmi3GetDirectionalDerivative.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), # unknowns + c_size_t, + POINTER(fmi3ValueReference), # knowns + c_size_t, + POINTER(fmi3Float64), # seed + c_size_t, + POINTER(fmi3Float64), # sensitivity + c_size_t, + ] + + self._fmi3GetAdjointDerivative = dll.fmi3GetAdjointDerivative + self._fmi3GetAdjointDerivative.restype = c_int + self._fmi3GetAdjointDerivative.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), # unknowns + c_size_t, + POINTER(fmi3ValueReference), # knowns + c_size_t, + POINTER(fmi3Float64), # seed + c_size_t, + POINTER(fmi3Float64), # sensitivity + c_size_t, + ] + + # ---- Configuration mode ---- + self._fmi3EnterConfigurationMode = dll.fmi3EnterConfigurationMode + self._fmi3EnterConfigurationMode.restype = c_int + self._fmi3EnterConfigurationMode.argtypes = [fmi3Instance] + + self._fmi3ExitConfigurationMode = dll.fmi3ExitConfigurationMode + self._fmi3ExitConfigurationMode.restype = c_int + self._fmi3ExitConfigurationMode.argtypes = [fmi3Instance] + + # ---- Clock functions ---- + self._fmi3GetIntervalDecimal = dll.fmi3GetIntervalDecimal + self._fmi3GetIntervalDecimal.restype = c_int + self._fmi3GetIntervalDecimal.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Float64), + POINTER(c_int), # fmi3IntervalQualifier + ] + + self._fmi3GetIntervalFraction = dll.fmi3GetIntervalFraction + self._fmi3GetIntervalFraction.restype = c_int + self._fmi3GetIntervalFraction.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3UInt64), + POINTER(fmi3UInt64), + POINTER(c_int), # fmi3IntervalQualifier + ] + + self._fmi3GetShiftDecimal = dll.fmi3GetShiftDecimal + self._fmi3GetShiftDecimal.restype = c_int + self._fmi3GetShiftDecimal.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Float64), + ] + + self._fmi3GetShiftFraction = dll.fmi3GetShiftFraction + self._fmi3GetShiftFraction.restype = c_int + self._fmi3GetShiftFraction.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3UInt64), + POINTER(fmi3UInt64), + ] + + self._fmi3SetIntervalDecimal = dll.fmi3SetIntervalDecimal + self._fmi3SetIntervalDecimal.restype = c_int + self._fmi3SetIntervalDecimal.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Float64), + ] + + self._fmi3SetIntervalFraction = dll.fmi3SetIntervalFraction + self._fmi3SetIntervalFraction.restype = c_int + self._fmi3SetIntervalFraction.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3UInt64), + POINTER(fmi3UInt64), + ] + + self._fmi3SetShiftDecimal = dll.fmi3SetShiftDecimal + self._fmi3SetShiftDecimal.restype = c_int + self._fmi3SetShiftDecimal.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Float64), + ] + + self._fmi3SetShiftFraction = dll.fmi3SetShiftFraction + self._fmi3SetShiftFraction.restype = c_int + self._fmi3SetShiftFraction.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3UInt64), + POINTER(fmi3UInt64), + ] + + self._fmi3EvaluateDiscreteStates = dll.fmi3EvaluateDiscreteStates + self._fmi3EvaluateDiscreteStates.restype = c_int + self._fmi3EvaluateDiscreteStates.argtypes = [fmi3Instance] + + self._fmi3UpdateDiscreteStates = dll.fmi3UpdateDiscreteStates + self._fmi3UpdateDiscreteStates.restype = c_int + self._fmi3UpdateDiscreteStates.argtypes = [ + fmi3Instance, + POINTER(fmi3Boolean), # discreteStatesNeedUpdate + POINTER(fmi3Boolean), # terminateSimulation + POINTER(fmi3Boolean), # nominalsOfContinuousStatesChanged + POINTER(fmi3Boolean), # valuesOfContinuousStatesChanged + POINTER(fmi3Boolean), # nextEventTimeDefined + POINTER(fmi3Float64), # nextEventTime + ] + + # ---- Model Exchange functions ---- + self._fmi3EnterContinuousTimeMode = dll.fmi3EnterContinuousTimeMode + self._fmi3EnterContinuousTimeMode.restype = c_int + self._fmi3EnterContinuousTimeMode.argtypes = [fmi3Instance] + + self._fmi3CompletedIntegratorStep = dll.fmi3CompletedIntegratorStep + self._fmi3CompletedIntegratorStep.restype = c_int + self._fmi3CompletedIntegratorStep.argtypes = [ + fmi3Instance, + fmi3Boolean, # noSetFMUStatePriorToCurrentPoint + POINTER(fmi3Boolean), # enterEventMode + POINTER(fmi3Boolean), # terminateSimulation + ] + + self._fmi3SetTime = dll.fmi3SetTime + self._fmi3SetTime.restype = c_int + self._fmi3SetTime.argtypes = [fmi3Instance, fmi3Float64] + + self._fmi3SetContinuousStates = dll.fmi3SetContinuousStates + self._fmi3SetContinuousStates.restype = c_int + self._fmi3SetContinuousStates.argtypes = [ + fmi3Instance, + POINTER(fmi3Float64), + c_size_t, + ] + + self._fmi3GetContinuousStateDerivatives = dll.fmi3GetContinuousStateDerivatives + self._fmi3GetContinuousStateDerivatives.restype = c_int + self._fmi3GetContinuousStateDerivatives.argtypes = [ + fmi3Instance, + POINTER(fmi3Float64), + c_size_t, + ] + + self._fmi3GetEventIndicators = dll.fmi3GetEventIndicators + self._fmi3GetEventIndicators.restype = c_int + self._fmi3GetEventIndicators.argtypes = [ + fmi3Instance, + POINTER(fmi3Float64), + c_size_t, + ] + + self._fmi3GetContinuousStates = dll.fmi3GetContinuousStates + self._fmi3GetContinuousStates.restype = c_int + self._fmi3GetContinuousStates.argtypes = [ + fmi3Instance, + POINTER(fmi3Float64), + c_size_t, + ] + + self._fmi3GetNominalsOfContinuousStates = dll.fmi3GetNominalsOfContinuousStates + self._fmi3GetNominalsOfContinuousStates.restype = c_int + self._fmi3GetNominalsOfContinuousStates.argtypes = [ + fmi3Instance, + POINTER(fmi3Float64), + c_size_t, + ] + + self._fmi3GetNumberOfEventIndicators = dll.fmi3GetNumberOfEventIndicators + self._fmi3GetNumberOfEventIndicators.restype = c_int + self._fmi3GetNumberOfEventIndicators.argtypes = [ + fmi3Instance, + POINTER(c_size_t), + ] + + self._fmi3GetNumberOfContinuousStates = dll.fmi3GetNumberOfContinuousStates + self._fmi3GetNumberOfContinuousStates.restype = c_int + self._fmi3GetNumberOfContinuousStates.argtypes = [ + fmi3Instance, + POINTER(c_size_t), + ] + + # ---- Co-Simulation functions ---- + self._fmi3EnterStepMode = dll.fmi3EnterStepMode + self._fmi3EnterStepMode.restype = c_int + self._fmi3EnterStepMode.argtypes = [fmi3Instance] + + self._fmi3GetOutputDerivatives = dll.fmi3GetOutputDerivatives + self._fmi3GetOutputDerivatives.restype = c_int + self._fmi3GetOutputDerivatives.argtypes = [ + fmi3Instance, + POINTER(fmi3ValueReference), + c_size_t, + POINTER(fmi3Int32), + POINTER(fmi3Float64), + c_size_t, + ] + + self._fmi3DoStep = dll.fmi3DoStep + self._fmi3DoStep.restype = c_int + self._fmi3DoStep.argtypes = [ + fmi3Instance, + fmi3Float64, # currentCommunicationPoint + fmi3Float64, # communicationStepSize + fmi3Boolean, # noSetFMUStatePriorToCurrentPoint + POINTER(fmi3Boolean), # eventHandlingNeeded + POINTER(fmi3Boolean), # terminateSimulation + POINTER(fmi3Boolean), # earlyReturn + POINTER(fmi3Float64), # lastSuccessfulTime + ] + + # ---- Scheduled Execution functions ---- + self._fmi3ActivateModelPartition = dll.fmi3ActivateModelPartition + self._fmi3ActivateModelPartition.restype = c_int + self._fmi3ActivateModelPartition.argtypes = [ + fmi3Instance, + fmi3ValueReference, # clockReference + fmi3Float64, # activationTime + ] + + # ------------------------------------------------------------------ + # Helper to convert Python lists → ctypes arrays + # ------------------------------------------------------------------ + @staticmethod + def _vr_array(vrs: Sequence[int]) -> ctypes.Array[c_uint32]: + arr_type = fmi3ValueReference * len(vrs) + return arr_type(*vrs) + + @staticmethod + def _float64_array(vals: Sequence[float]) -> ctypes.Array[c_double]: + arr_type = fmi3Float64 * len(vals) + return arr_type(*vals) + + @staticmethod + def _float32_array(vals: Sequence[float]) -> ctypes.Array[c_float]: + arr_type = fmi3Float32 * len(vals) + return arr_type(*vals) + + @staticmethod + def _int32_array(vals: Sequence[int]) -> ctypes.Array[c_int32]: + arr_type = fmi3Int32 * len(vals) + return arr_type(*vals) + + @staticmethod + def _uint32_array(vals: Sequence[int]) -> ctypes.Array[c_uint32]: + arr_type = fmi3UInt32 * len(vals) + return arr_type(*vals) + + @staticmethod + def _bool_array(vals: Sequence[bool]) -> ctypes.Array[c_bool]: + arr_type = fmi3Boolean * len(vals) + return arr_type(*vals) + + @staticmethod + def _string_array(vals: Sequence[str]) -> ctypes.Array[c_char_p]: + arr_type = fmi3String * len(vals) + return arr_type(*(v.encode("utf-8") for v in vals)) + + # ------------------------------------------------------------------ + # Common functions + # ------------------------------------------------------------------ + def get_version(self) -> str: + """Return the FMI version string (e.g. ``'3.0'``).""" + return self._fmi3GetVersion().decode() + + def set_debug_logging( + self, + logging_on: bool, + categories: Sequence[str] | None = None, + ) -> Fmi3Status: + cats: Sequence[str] = categories or [] + n = len(cats) + if n > 0: + arr = self._string_array(cats) + status = self._fmi3SetDebugLogging( + self._instance, + logging_on, + n, + arr, + ) + else: + status = self._fmi3SetDebugLogging( + self._instance, + logging_on, + 0, + None, + ) + return _check_status("fmi3SetDebugLogging", status) + + # ------------------------------------------------------------------ + # Instantiation + # ------------------------------------------------------------------ + def instantiate_model_exchange( + self, + instance_name: str, + *, + instantiation_token: str, + resource_path: str | None = None, + visible: bool = False, + logging_on: bool = False, + ) -> None: + """Instantiate a Model Exchange FMU. + + Args: + instance_name: Name for this FMU instance. + instantiation_token: The instantiationToken from + modelDescription.xml. + resource_path: Absolute file path to the ``resources/`` + directory (with trailing separator). Derived + automatically when *None*. + visible: Whether a simulator UI should be shown. + logging_on: Whether debug logging is initially enabled. + """ + rp = self._resolve_resource_path(resource_path) + instance = self._fmi3InstantiateModelExchange( + instance_name.encode("utf-8"), + instantiation_token.encode("utf-8"), + rp, + visible, + logging_on, + None, # instanceEnvironment + _LOGGER_FUNC, + ) + if not instance: + raise RuntimeError( + f"fmi3InstantiateModelExchange returned NULL for {instance_name!r}" + ) + self._instance = instance + + def instantiate_co_simulation( + self, + instance_name: str, + *, + instantiation_token: str, + resource_path: str | None = None, + visible: bool = False, + logging_on: bool = False, + event_mode_used: bool = False, + early_return_allowed: bool = False, + required_intermediate_variables: Sequence[int] | None = None, + intermediate_update_callback: Any | None = None, + ) -> None: + """Instantiate a Co-Simulation FMU. + + Args: + instance_name: Name for this FMU instance. + instantiation_token: The instantiationToken from + modelDescription.xml. + resource_path: Absolute file path to the ``resources/`` + directory. Derived automatically when *None*. + visible: Whether a simulator UI should be shown. + logging_on: Whether debug logging is initially enabled. + event_mode_used: Whether the importer will use Event Mode. + early_return_allowed: Whether early return from + ``fmi3DoStep`` is allowed. + required_intermediate_variables: Value references of + variables that need intermediate access. + intermediate_update_callback: Optional callback. When + *None*, a NULL pointer is passed. + """ + rp = self._resolve_resource_path(resource_path) + + if required_intermediate_variables: + n_riv = len(required_intermediate_variables) + riv_arr = self._vr_array(required_intermediate_variables) + else: + n_riv = 0 + riv_arr = None # type: ignore[assignment] + + iu_cb = ( + _fmi3IntermediateUpdateCallback(intermediate_update_callback) + if intermediate_update_callback is not None + else _fmi3IntermediateUpdateCallback(0) + ) + + instance = self._fmi3InstantiateCoSimulation( + instance_name.encode("utf-8"), + instantiation_token.encode("utf-8"), + rp, + visible, + logging_on, + event_mode_used, + early_return_allowed, + riv_arr, + n_riv, + None, # instanceEnvironment + _LOGGER_FUNC, + iu_cb, + ) + if not instance: + raise RuntimeError( + f"fmi3InstantiateCoSimulation returned NULL for {instance_name!r}" + ) + self._instance = instance + + def instantiate_scheduled_execution( + self, + instance_name: str, + *, + instantiation_token: str, + resource_path: str | None = None, + visible: bool = False, + logging_on: bool = False, + clock_update_callback: Any | None = None, + lock_preemption_callback: Any | None = None, + unlock_preemption_callback: Any | None = None, + ) -> None: + """Instantiate a Scheduled Execution FMU. + + Args: + instance_name: Name for this FMU instance. + instantiation_token: The instantiationToken from + modelDescription.xml. + resource_path: Absolute file path to the ``resources/`` + directory. Derived automatically when *None*. + visible: Whether a simulator UI should be shown. + logging_on: Whether debug logging is initially enabled. + clock_update_callback: Callback for clock updates. + lock_preemption_callback: Callback to lock preemption. + unlock_preemption_callback: Callback to unlock preemption. + """ + rp = self._resolve_resource_path(resource_path) + + cu_cb = ( + _fmi3ClockUpdateCallback(clock_update_callback) + if clock_update_callback is not None + else _fmi3ClockUpdateCallback(0) + ) + lp_cb = ( + _fmi3LockPreemptionCallback(lock_preemption_callback) + if lock_preemption_callback is not None + else _fmi3LockPreemptionCallback(0) + ) + up_cb = ( + _fmi3UnlockPreemptionCallback(unlock_preemption_callback) + if unlock_preemption_callback is not None + else _fmi3UnlockPreemptionCallback(0) + ) + + instance = self._fmi3InstantiateScheduledExecution( + instance_name.encode("utf-8"), + instantiation_token.encode("utf-8"), + rp, + visible, + logging_on, + None, # instanceEnvironment + _LOGGER_FUNC, + cu_cb, + lp_cb, + up_cb, + ) + if not instance: + raise RuntimeError( + f"fmi3InstantiateScheduledExecution returned NULL for {instance_name!r}" + ) + self._instance = instance + + def free_instance(self) -> None: + """Free the FMU instance and release resources.""" + if self._instance is not None: + self._fmi3FreeInstance(self._instance) + self._instance = None + + def _resolve_resource_path(self, resource_path: str | None) -> bytes | None: + """Resolve resource path to an encoded bytes string or None.""" + if resource_path is not None: + return resource_path.encode("utf-8") + resources_dir = self._extract_dir / "resources" + if resources_dir.exists(): + # FMI 3.0 uses absolute file path (not URI), with trailing sep + return (str(resources_dir.resolve()) + "/").encode("utf-8") + return None + + # ------------------------------------------------------------------ + # Initialization / lifecycle + # ------------------------------------------------------------------ + def enter_initialization_mode( + self, + start_time: float = 0.0, + stop_time: float | None = None, + tolerance: float | None = None, + ) -> Fmi3Status: + tolerance_defined = tolerance is not None + tol_val = tolerance if tolerance is not None else 0.0 + stop_defined = stop_time is not None + stop_val = stop_time if stop_time is not None else 0.0 + + status = self._fmi3EnterInitializationMode( + self._instance, + tolerance_defined, + tol_val, + start_time, + stop_defined, + stop_val, + ) + return _check_status("fmi3EnterInitializationMode", status) + + def exit_initialization_mode(self) -> Fmi3Status: + status = self._fmi3ExitInitializationMode(self._instance) + return _check_status("fmi3ExitInitializationMode", status) + + def enter_event_mode(self) -> Fmi3Status: + status = self._fmi3EnterEventMode(self._instance) + return _check_status("fmi3EnterEventMode", status) + + def terminate(self) -> Fmi3Status: + status = self._fmi3Terminate(self._instance) + return _check_status("fmi3Terminate", status) + + def reset(self) -> Fmi3Status: + status = self._fmi3Reset(self._instance) + return _check_status("fmi3Reset", status) + + # ------------------------------------------------------------------ + # Getting variable values + # ------------------------------------------------------------------ + def get_float32(self, vrs: Sequence[int]) -> list[float]: + n = len(vrs) + values = (fmi3Float32 * n)() + status = self._fmi3GetFloat32( + self._instance, + self._vr_array(vrs), + n, + values, + n, + ) + _check_status("fmi3GetFloat32", status) + return list(values) + + def get_float64(self, vrs: Sequence[int]) -> list[float]: + n = len(vrs) + values = (fmi3Float64 * n)() + status = self._fmi3GetFloat64( + self._instance, + self._vr_array(vrs), + n, + values, + n, + ) + _check_status("fmi3GetFloat64", status) + return list(values) + + def get_int8(self, vrs: Sequence[int]) -> list[int]: + n = len(vrs) + values = (fmi3Int8 * n)() + status = self._fmi3GetInt8( + self._instance, + self._vr_array(vrs), + n, + values, + n, + ) + _check_status("fmi3GetInt8", status) + return list(values) + + def get_uint8(self, vrs: Sequence[int]) -> list[int]: + n = len(vrs) + values = (fmi3UInt8 * n)() + status = self._fmi3GetUInt8( + self._instance, + self._vr_array(vrs), + n, + values, + n, + ) + _check_status("fmi3GetUInt8", status) + return list(values) + + def get_int16(self, vrs: Sequence[int]) -> list[int]: + n = len(vrs) + values = (fmi3Int16 * n)() + status = self._fmi3GetInt16( + self._instance, + self._vr_array(vrs), + n, + values, + n, + ) + _check_status("fmi3GetInt16", status) + return list(values) + + def get_uint16(self, vrs: Sequence[int]) -> list[int]: + n = len(vrs) + values = (fmi3UInt16 * n)() + status = self._fmi3GetUInt16( + self._instance, + self._vr_array(vrs), + n, + values, + n, + ) + _check_status("fmi3GetUInt16", status) + return list(values) + + def get_int32(self, vrs: Sequence[int]) -> list[int]: + n = len(vrs) + values = (fmi3Int32 * n)() + status = self._fmi3GetInt32( + self._instance, + self._vr_array(vrs), + n, + values, + n, + ) + _check_status("fmi3GetInt32", status) + return list(values) + + def get_uint32(self, vrs: Sequence[int]) -> list[int]: + n = len(vrs) + values = (fmi3UInt32 * n)() + status = self._fmi3GetUInt32( + self._instance, + self._vr_array(vrs), + n, + values, + n, + ) + _check_status("fmi3GetUInt32", status) + return list(values) + + def get_int64(self, vrs: Sequence[int]) -> list[int]: + n = len(vrs) + values = (fmi3Int64 * n)() + status = self._fmi3GetInt64( + self._instance, + self._vr_array(vrs), + n, + values, + n, + ) + _check_status("fmi3GetInt64", status) + return list(values) + + def get_uint64(self, vrs: Sequence[int]) -> list[int]: + n = len(vrs) + values = (fmi3UInt64 * n)() + status = self._fmi3GetUInt64( + self._instance, + self._vr_array(vrs), + n, + values, + n, + ) + _check_status("fmi3GetUInt64", status) + return list(values) + + def get_boolean(self, vrs: Sequence[int]) -> list[bool]: + n = len(vrs) + values = (fmi3Boolean * n)() + status = self._fmi3GetBoolean( + self._instance, + self._vr_array(vrs), + n, + values, + n, + ) + _check_status("fmi3GetBoolean", status) + return [bool(v) for v in values] + + def get_string(self, vrs: Sequence[int]) -> list[str]: + n = len(vrs) + values = (fmi3String * n)() + status = self._fmi3GetString( + self._instance, + self._vr_array(vrs), + n, + values, + n, + ) + _check_status("fmi3GetString", status) + return [v.decode("utf-8") if v else "" for v in values] + + def get_binary(self, vrs: Sequence[int]) -> list[bytes]: + """Get binary variable values. + + Returns: + A list of ``bytes`` objects, one per value reference. + """ + n = len(vrs) + sizes = (c_size_t * n)() + values = (fmi3Binary * n)() + status = self._fmi3GetBinary( + self._instance, + self._vr_array(vrs), + n, + sizes, + values, + n, + ) + _check_status("fmi3GetBinary", status) + result = [] + for i in range(n): + if values[i] and sizes[i]: + result.append(bytes(values[i][: sizes[i]])) + else: + result.append(b"") + return result + + def get_clock(self, vrs: Sequence[int]) -> list[bool]: + n = len(vrs) + values = (fmi3Clock * n)() + status = self._fmi3GetClock( + self._instance, + self._vr_array(vrs), + n, + values, + ) + _check_status("fmi3GetClock", status) + return [bool(v) for v in values] + + # ------------------------------------------------------------------ + # Setting variable values + # ------------------------------------------------------------------ + def set_float32( + self, + vrs: Sequence[int], + values: Sequence[float], + ) -> Fmi3Status: + n = len(vrs) + status = self._fmi3SetFloat32( + self._instance, + self._vr_array(vrs), + n, + self._float32_array(values), + len(values), + ) + return _check_status("fmi3SetFloat32", status) + + def set_float64( + self, + vrs: Sequence[int], + values: Sequence[float], + ) -> Fmi3Status: + n = len(vrs) + status = self._fmi3SetFloat64( + self._instance, + self._vr_array(vrs), + n, + self._float64_array(values), + len(values), + ) + return _check_status("fmi3SetFloat64", status) + + def set_int8( + self, + vrs: Sequence[int], + values: Sequence[int], + ) -> Fmi3Status: + n = len(vrs) + arr = (fmi3Int8 * len(values))(*values) + status = self._fmi3SetInt8( + self._instance, + self._vr_array(vrs), + n, + arr, + len(values), + ) + return _check_status("fmi3SetInt8", status) + + def set_uint8( + self, + vrs: Sequence[int], + values: Sequence[int], + ) -> Fmi3Status: + n = len(vrs) + arr = (fmi3UInt8 * len(values))(*values) + status = self._fmi3SetUInt8( + self._instance, + self._vr_array(vrs), + n, + arr, + len(values), + ) + return _check_status("fmi3SetUInt8", status) + + def set_int16( + self, + vrs: Sequence[int], + values: Sequence[int], + ) -> Fmi3Status: + n = len(vrs) + arr = (fmi3Int16 * len(values))(*values) + status = self._fmi3SetInt16( + self._instance, + self._vr_array(vrs), + n, + arr, + len(values), + ) + return _check_status("fmi3SetInt16", status) + + def set_uint16( + self, + vrs: Sequence[int], + values: Sequence[int], + ) -> Fmi3Status: + n = len(vrs) + arr = (fmi3UInt16 * len(values))(*values) + status = self._fmi3SetUInt16( + self._instance, + self._vr_array(vrs), + n, + arr, + len(values), + ) + return _check_status("fmi3SetUInt16", status) + + def set_int32( + self, + vrs: Sequence[int], + values: Sequence[int], + ) -> Fmi3Status: + n = len(vrs) + status = self._fmi3SetInt32( + self._instance, + self._vr_array(vrs), + n, + self._int32_array(values), + len(values), + ) + return _check_status("fmi3SetInt32", status) + + def set_uint32( + self, + vrs: Sequence[int], + values: Sequence[int], + ) -> Fmi3Status: + n = len(vrs) + status = self._fmi3SetUInt32( + self._instance, + self._vr_array(vrs), + n, + self._uint32_array(values), + len(values), + ) + return _check_status("fmi3SetUInt32", status) + + def set_int64( + self, + vrs: Sequence[int], + values: Sequence[int], + ) -> Fmi3Status: + n = len(vrs) + arr = (fmi3Int64 * len(values))(*values) + status = self._fmi3SetInt64( + self._instance, + self._vr_array(vrs), + n, + arr, + len(values), + ) + return _check_status("fmi3SetInt64", status) + + def set_uint64( + self, + vrs: Sequence[int], + values: Sequence[int], + ) -> Fmi3Status: + n = len(vrs) + arr = (fmi3UInt64 * len(values))(*values) + status = self._fmi3SetUInt64( + self._instance, + self._vr_array(vrs), + n, + arr, + len(values), + ) + return _check_status("fmi3SetUInt64", status) + + def set_boolean( + self, + vrs: Sequence[int], + values: Sequence[bool], + ) -> Fmi3Status: + n = len(vrs) + status = self._fmi3SetBoolean( + self._instance, + self._vr_array(vrs), + n, + self._bool_array(values), + len(values), + ) + return _check_status("fmi3SetBoolean", status) + + def set_string( + self, + vrs: Sequence[int], + values: Sequence[str], + ) -> Fmi3Status: + n = len(vrs) + status = self._fmi3SetString( + self._instance, + self._vr_array(vrs), + n, + self._string_array(values), + len(values), + ) + return _check_status("fmi3SetString", status) + + def set_binary( + self, + vrs: Sequence[int], + values: Sequence[bytes], + ) -> Fmi3Status: + """Set binary variable values. + + Args: + vrs: Value references. + values: Sequence of ``bytes`` objects. + """ + n = len(vrs) + n_vals = len(values) + sizes = (c_size_t * n_vals)(*(len(v) for v in values)) + ptrs = (fmi3Binary * n_vals)() + for i, v in enumerate(values): + buf = (c_uint8 * len(v))(*v) + ptrs[i] = ctypes.cast(buf, fmi3Binary) + status = self._fmi3SetBinary( + self._instance, + self._vr_array(vrs), + n, + sizes, + ptrs, + n_vals, + ) + return _check_status("fmi3SetBinary", status) + + def set_clock( + self, + vrs: Sequence[int], + values: Sequence[bool], + ) -> Fmi3Status: + n = len(vrs) + arr = (fmi3Clock * n)(*values) + status = self._fmi3SetClock( + self._instance, + self._vr_array(vrs), + n, + arr, + ) + return _check_status("fmi3SetClock", status) + + # ------------------------------------------------------------------ + # FMU State + # ------------------------------------------------------------------ + def get_fmu_state(self) -> c_void_p: + state = fmi3FMUState() + status = self._fmi3GetFMUState(self._instance, byref(state)) + _check_status("fmi3GetFMUState", status) + return state + + def set_fmu_state(self, state: c_void_p) -> Fmi3Status: + status = self._fmi3SetFMUState(self._instance, state) + return _check_status("fmi3SetFMUState", status) + + def free_fmu_state(self, state: c_void_p) -> Fmi3Status: + status = self._fmi3FreeFMUState(self._instance, byref(state)) + return _check_status("fmi3FreeFMUState", status) + + def serialized_fmu_state_size(self, state: c_void_p) -> int: + size = c_size_t() + status = self._fmi3SerializedFMUStateSize( + self._instance, + state, + byref(size), + ) + _check_status("fmi3SerializedFMUStateSize", status) + return size.value + + def serialize_fmu_state(self, state: c_void_p) -> bytes: + size = self.serialized_fmu_state_size(state) + buf = (fmi3Byte * size)() + status = self._fmi3SerializeFMUState( + self._instance, + state, + buf, + size, + ) + _check_status("fmi3SerializeFMUState", status) + return bytes(buf) + + def deserialize_fmu_state(self, data: bytes) -> c_void_p: + size = len(data) + buf = (fmi3Byte * size)(*data) + state = fmi3FMUState() + status = self._fmi3DeserializeFMUState( + self._instance, + buf, + size, + byref(state), + ) + _check_status("fmi3DeserializeFMUState", status) + return state + + # ------------------------------------------------------------------ + # Directional / adjoint derivatives + # ------------------------------------------------------------------ + def get_directional_derivative( + self, + unknowns: Sequence[int], + knowns: Sequence[int], + seed: Sequence[float], + ) -> list[float]: + n_unknowns = len(unknowns) + n_knowns = len(knowns) + n_seed = len(seed) + sensitivity = (fmi3Float64 * n_unknowns)() + status = self._fmi3GetDirectionalDerivative( + self._instance, + self._vr_array(unknowns), + n_unknowns, + self._vr_array(knowns), + n_knowns, + self._float64_array(seed), + n_seed, + sensitivity, + n_unknowns, + ) + _check_status("fmi3GetDirectionalDerivative", status) + return list(sensitivity) + + def get_adjoint_derivative( + self, + unknowns: Sequence[int], + knowns: Sequence[int], + seed: Sequence[float], + ) -> list[float]: + n_unknowns = len(unknowns) + n_knowns = len(knowns) + n_seed = len(seed) + sensitivity = (fmi3Float64 * n_knowns)() + status = self._fmi3GetAdjointDerivative( + self._instance, + self._vr_array(unknowns), + n_unknowns, + self._vr_array(knowns), + n_knowns, + self._float64_array(seed), + n_seed, + sensitivity, + n_knowns, + ) + _check_status("fmi3GetAdjointDerivative", status) + return list(sensitivity) + + # ------------------------------------------------------------------ + # Configuration mode + # ------------------------------------------------------------------ + def enter_configuration_mode(self) -> Fmi3Status: + status = self._fmi3EnterConfigurationMode(self._instance) + return _check_status("fmi3EnterConfigurationMode", status) + + def exit_configuration_mode(self) -> Fmi3Status: + status = self._fmi3ExitConfigurationMode(self._instance) + return _check_status("fmi3ExitConfigurationMode", status) + + # ------------------------------------------------------------------ + # Clock functions + # ------------------------------------------------------------------ + def get_interval_decimal( + self, + vrs: Sequence[int], + ) -> tuple[list[float], list[Fmi3IntervalQualifier]]: + n = len(vrs) + intervals = (fmi3Float64 * n)() + qualifiers = (c_int * n)() + status = self._fmi3GetIntervalDecimal( + self._instance, + self._vr_array(vrs), + n, + intervals, + qualifiers, + ) + _check_status("fmi3GetIntervalDecimal", status) + return ( + list(intervals), + [Fmi3IntervalQualifier(q) for q in qualifiers], + ) + + def get_interval_fraction( + self, + vrs: Sequence[int], + ) -> tuple[list[int], list[int], list[Fmi3IntervalQualifier]]: + n = len(vrs) + counters = (fmi3UInt64 * n)() + resolutions = (fmi3UInt64 * n)() + qualifiers = (c_int * n)() + status = self._fmi3GetIntervalFraction( + self._instance, + self._vr_array(vrs), + n, + counters, + resolutions, + qualifiers, + ) + _check_status("fmi3GetIntervalFraction", status) + return ( + list(counters), + list(resolutions), + [Fmi3IntervalQualifier(q) for q in qualifiers], + ) + + def set_interval_decimal( + self, + vrs: Sequence[int], + intervals: Sequence[float], + ) -> Fmi3Status: + n = len(vrs) + status = self._fmi3SetIntervalDecimal( + self._instance, + self._vr_array(vrs), + n, + self._float64_array(intervals), + ) + return _check_status("fmi3SetIntervalDecimal", status) + + def get_shift_decimal(self, vrs: Sequence[int]) -> list[float]: + n = len(vrs) + shifts = (fmi3Float64 * n)() + status = self._fmi3GetShiftDecimal( + self._instance, + self._vr_array(vrs), + n, + shifts, + ) + _check_status("fmi3GetShiftDecimal", status) + return list(shifts) + + def set_shift_decimal( + self, + vrs: Sequence[int], + shifts: Sequence[float], + ) -> Fmi3Status: + n = len(vrs) + status = self._fmi3SetShiftDecimal( + self._instance, + self._vr_array(vrs), + n, + self._float64_array(shifts), + ) + return _check_status("fmi3SetShiftDecimal", status) + + def evaluate_discrete_states(self) -> Fmi3Status: + status = self._fmi3EvaluateDiscreteStates(self._instance) + return _check_status("fmi3EvaluateDiscreteStates", status) + + class UpdateDiscreteStatesResult: + """Result of :meth:`update_discrete_states`.""" + + __slots__ = ( + "discrete_states_need_update", + "terminate_simulation", + "nominals_of_continuous_states_changed", + "values_of_continuous_states_changed", + "next_event_time_defined", + "next_event_time", + ) + + def __init__( + self, + discrete_states_need_update: bool, + terminate_simulation: bool, + nominals_of_continuous_states_changed: bool, + values_of_continuous_states_changed: bool, + next_event_time_defined: bool, + next_event_time: float, + ) -> None: + self.discrete_states_need_update = discrete_states_need_update + self.terminate_simulation = terminate_simulation + self.nominals_of_continuous_states_changed = ( + nominals_of_continuous_states_changed + ) + self.values_of_continuous_states_changed = ( + values_of_continuous_states_changed + ) + self.next_event_time_defined = next_event_time_defined + self.next_event_time = next_event_time + + def update_discrete_states(self) -> UpdateDiscreteStatesResult: + """Call ``fmi3UpdateDiscreteStates``. + + Returns: + An :class:`UpdateDiscreteStatesResult` with the six output + parameters. + """ + dsnu = fmi3Boolean(fmi3False) + ts = fmi3Boolean(fmi3False) + nocsc = fmi3Boolean(fmi3False) + vocsc = fmi3Boolean(fmi3False) + netd = fmi3Boolean(fmi3False) + net = fmi3Float64(0.0) + status = self._fmi3UpdateDiscreteStates( + self._instance, + byref(dsnu), + byref(ts), + byref(nocsc), + byref(vocsc), + byref(netd), + byref(net), + ) + _check_status("fmi3UpdateDiscreteStates", status) + return self.UpdateDiscreteStatesResult( + discrete_states_need_update=bool(dsnu.value), + terminate_simulation=bool(ts.value), + nominals_of_continuous_states_changed=bool(nocsc.value), + values_of_continuous_states_changed=bool(vocsc.value), + next_event_time_defined=bool(netd.value), + next_event_time=net.value, + ) + + # ------------------------------------------------------------------ + # Model Exchange functions + # ------------------------------------------------------------------ + def enter_continuous_time_mode(self) -> Fmi3Status: + status = self._fmi3EnterContinuousTimeMode(self._instance) + return _check_status("fmi3EnterContinuousTimeMode", status) + + def completed_integrator_step( + self, + no_set_fmu_state_prior: bool = True, + ) -> tuple[bool, bool]: + """Call ``fmi3CompletedIntegratorStep``. + + Returns: + ``(enter_event_mode, terminate_simulation)`` booleans. + """ + enter_event = fmi3Boolean(fmi3False) + terminate = fmi3Boolean(fmi3False) + status = self._fmi3CompletedIntegratorStep( + self._instance, + no_set_fmu_state_prior, + byref(enter_event), + byref(terminate), + ) + _check_status("fmi3CompletedIntegratorStep", status) + return bool(enter_event.value), bool(terminate.value) + + def set_time(self, time: float) -> Fmi3Status: + status = self._fmi3SetTime(self._instance, time) + return _check_status("fmi3SetTime", status) + + def set_continuous_states(self, states: Sequence[float]) -> Fmi3Status: + nx = len(states) + status = self._fmi3SetContinuousStates( + self._instance, + self._float64_array(states), + nx, + ) + return _check_status("fmi3SetContinuousStates", status) + + def get_continuous_state_derivatives(self, nx: int) -> list[float]: + """Get state derivatives. + + Args: + nx: Number of continuous states. + """ + derivatives = (fmi3Float64 * nx)() + status = self._fmi3GetContinuousStateDerivatives( + self._instance, + derivatives, + nx, + ) + _check_status("fmi3GetContinuousStateDerivatives", status) + return list(derivatives) + + def get_event_indicators(self, ni: int) -> list[float]: + """Get event indicators. + + Args: + ni: Number of event indicators. + """ + indicators = (fmi3Float64 * ni)() + status = self._fmi3GetEventIndicators( + self._instance, + indicators, + ni, + ) + _check_status("fmi3GetEventIndicators", status) + return list(indicators) + + def get_continuous_states(self, nx: int) -> list[float]: + """Get continuous state values. + + Args: + nx: Number of continuous states. + """ + states = (fmi3Float64 * nx)() + status = self._fmi3GetContinuousStates( + self._instance, + states, + nx, + ) + _check_status("fmi3GetContinuousStates", status) + return list(states) + + def get_nominals_of_continuous_states(self, nx: int) -> list[float]: + """Get nominals of continuous states. + + Args: + nx: Number of continuous states. + """ + nominals = (fmi3Float64 * nx)() + status = self._fmi3GetNominalsOfContinuousStates( + self._instance, + nominals, + nx, + ) + _check_status("fmi3GetNominalsOfContinuousStates", status) + return list(nominals) + + def get_number_of_event_indicators(self) -> int: + """Query the number of event indicators.""" + n = c_size_t() + status = self._fmi3GetNumberOfEventIndicators( + self._instance, + byref(n), + ) + _check_status("fmi3GetNumberOfEventIndicators", status) + return n.value + + def get_number_of_continuous_states(self) -> int: + """Query the number of continuous states.""" + n = c_size_t() + status = self._fmi3GetNumberOfContinuousStates( + self._instance, + byref(n), + ) + _check_status("fmi3GetNumberOfContinuousStates", status) + return n.value + + # ------------------------------------------------------------------ + # Co-Simulation functions + # ------------------------------------------------------------------ + def enter_step_mode(self) -> Fmi3Status: + status = self._fmi3EnterStepMode(self._instance) + return _check_status("fmi3EnterStepMode", status) + + def get_output_derivatives( + self, + vrs: Sequence[int], + orders: Sequence[int], + ) -> list[float]: + n = len(vrs) + values = (fmi3Float64 * n)() + status = self._fmi3GetOutputDerivatives( + self._instance, + self._vr_array(vrs), + n, + self._int32_array(orders), + values, + n, + ) + _check_status("fmi3GetOutputDerivatives", status) + return list(values) + + class DoStepResult: + """Result of :meth:`do_step`.""" + + __slots__ = ( + "status", + "event_handling_needed", + "terminate_simulation", + "early_return", + "last_successful_time", + ) + + def __init__( + self, + status: Fmi3Status, + event_handling_needed: bool, + terminate_simulation: bool, + early_return: bool, + last_successful_time: float, + ) -> None: + self.status = status + self.event_handling_needed = event_handling_needed + self.terminate_simulation = terminate_simulation + self.early_return = early_return + self.last_successful_time = last_successful_time + + def do_step( + self, + current_communication_point: float, + communication_step_size: float, + no_set_fmu_state_prior: bool = True, + ) -> DoStepResult: + """Advance the Co-Simulation by one step. + + Returns: + A :class:`DoStepResult` containing the status and the four + output flags / values. + """ + event_handling_needed = fmi3Boolean(fmi3False) + terminate_simulation = fmi3Boolean(fmi3False) + early_return = fmi3Boolean(fmi3False) + last_successful_time = fmi3Float64(0.0) + + raw_status = self._fmi3DoStep( + self._instance, + current_communication_point, + communication_step_size, + no_set_fmu_state_prior, + byref(event_handling_needed), + byref(terminate_simulation), + byref(early_return), + byref(last_successful_time), + ) + status = _check_status("fmi3DoStep", raw_status) + return self.DoStepResult( + status=status, + event_handling_needed=bool(event_handling_needed.value), + terminate_simulation=bool(terminate_simulation.value), + early_return=bool(early_return.value), + last_successful_time=last_successful_time.value, + ) + + # ------------------------------------------------------------------ + # Scheduled Execution functions + # ------------------------------------------------------------------ + def activate_model_partition( + self, + clock_reference: int, + activation_time: float, + ) -> Fmi3Status: + status = self._fmi3ActivateModelPartition( + self._instance, + clock_reference, + activation_time, + ) + return _check_status("fmi3ActivateModelPartition", status) + + # ------------------------------------------------------------------ + # Context manager support + # ------------------------------------------------------------------ + def __enter__(self) -> Fmi3Slave: + return self + + def __exit__(self, *_: object) -> None: + self.free_instance() + if self._tmpdir is not None: + self._tmpdir.cleanup() + self._tmpdir = None + + def __del__(self) -> None: + if self._tmpdir is not None: + try: + self._tmpdir.cleanup() + except Exception: + pass diff --git a/tests/test_fmi3.py b/tests/test_fmi3.py new file mode 100644 index 0000000..279c6b0 --- /dev/null +++ b/tests/test_fmi3.py @@ -0,0 +1,1063 @@ +import platform + +import pytest +from fmuloader.fmi3 import ( + Fmi3Error, + Fmi3IntervalQualifier, + Fmi3Slave, + Fmi3Status, + Fmi3Type, + _platform_folder, + _shared_lib_extension, +) + +# ----------------------------------------------------------------------- +# All reference FMUs with their model identifier and instantiation token +# ----------------------------------------------------------------------- +BOUNCING_BALL = ( + "3.0/BouncingBall.fmu", + "BouncingBall", + "{1AE5E10D-9521-4DE3-80B9-D0EAAA7D5AF1}", +) +DAHLQUIST = ("3.0/Dahlquist.fmu", "Dahlquist", "{221063D2-EF4A-45FE-B954-B5BFEEA9A59B}") +FEEDTHROUGH = ( + "3.0/Feedthrough.fmu", + "Feedthrough", + "{37B954F1-CC86-4D8F-B97F-C7C36F6670D2}", +) +STAIR = ("3.0/Stair.fmu", "Stair", "{BD403596-3166-4232-ABC2-132BDF73E644}") +RESOURCE = ("3.0/Resource.fmu", "Resource", "{7b9c2114-2ce5-4076-a138-2cbc69e069e5}") +VAN_DER_POL = ( + "3.0/VanDerPol.fmu", + "VanDerPol", + "{BD403596-3166-4232-ABC2-132BDF73E644}", +) +STATE_SPACE = ( + "3.0/StateSpace.fmu", + "StateSpace", + "{D773325B-AB94-4630-BF85-643EB24FCB78}", +) +CLOCKS = ("3.0/Clocks.fmu", "Clocks", "{C5F142BA-B849-42DA-B4A1-4745BFF3BE28}") + + +# ----------------------------------------------------------------------- +# Helpers +# ----------------------------------------------------------------------- +def _make_slave(reference_fmus_dir, fmu_tuple): + fmu_path, model_id, _token = fmu_tuple + return Fmi3Slave( + (reference_fmus_dir / fmu_path).absolute(), + model_identifier=model_id, + ) + + +def _instantiate_cs(slave, fmu_tuple, instance_name="test", **kwargs): + """Instantiate as Co-Simulation and return the slave.""" + _, _, token = fmu_tuple + slave.instantiate_co_simulation( + instance_name, + instantiation_token=token, + **kwargs, + ) + return slave + + +def _instantiate_me(slave, fmu_tuple, instance_name="test", **kwargs): + """Instantiate as Model Exchange and return the slave.""" + _, _, token = fmu_tuple + slave.instantiate_model_exchange( + instance_name, + instantiation_token=token, + **kwargs, + ) + return slave + + +# ======================================================================= +# Helper / enum unit tests (no FMU binary needed) +# ======================================================================= +class TestHelpers: + def test_shared_lib_extension(self): + ext = _shared_lib_extension() + system = platform.system() + if system == "Windows": + assert ext == ".dll" + elif system == "Darwin": + assert ext == ".dylib" + else: + assert ext == ".so" + + def test_platform_folder(self): + folder = _platform_folder() + system = platform.system() + if system == "Darwin": + assert folder in ("x86_64-darwin", "aarch64-darwin") + elif system == "Linux": + assert folder in ("x86_64-linux", "x86-linux", "aarch64-linux") + elif system == "Windows": + assert folder in ("x86_64-windows", "x86-windows") + + def test_fmi3_status_enum(self): + assert Fmi3Status.OK == 0 + assert Fmi3Status.WARNING == 1 + assert Fmi3Status.DISCARD == 2 + assert Fmi3Status.ERROR == 3 + assert Fmi3Status.FATAL == 4 + + def test_fmi3_type_enum(self): + assert Fmi3Type.MODEL_EXCHANGE == 0 + assert Fmi3Type.CO_SIMULATION == 1 + assert Fmi3Type.SCHEDULED_EXECUTION == 2 + + def test_fmi3_interval_qualifier_enum(self): + assert Fmi3IntervalQualifier.INTERVAL_NOT_YET_KNOWN == 0 + assert Fmi3IntervalQualifier.INTERVAL_UNCHANGED == 1 + assert Fmi3IntervalQualifier.INTERVAL_CHANGED == 2 + + def test_fmi3_error_exception(self): + error = Fmi3Error("testFunction", Fmi3Status.ERROR) + assert error.func_name == "testFunction" + assert error.status == Fmi3Status.ERROR + assert "testFunction" in str(error) + assert "ERROR" in str(error) + + +# ======================================================================= +# Basic loading & version for every CS/ME reference FMU +# ======================================================================= +@pytest.mark.parametrize( + "fmu_tuple", + [BOUNCING_BALL, DAHLQUIST, FEEDTHROUGH, STAIR, RESOURCE, VAN_DER_POL, STATE_SPACE], + ids=[ + "BouncingBall", + "Dahlquist", + "Feedthrough", + "Stair", + "Resource", + "VanDerPol", + "StateSpace", + ], +) +def test_fmi3_slave_basic(fmu_tuple, reference_fmus_dir): + """Load every CS/ME reference FMU and check version.""" + slave = _make_slave(reference_fmus_dir, fmu_tuple) + assert slave.get_version() == "3.0" + + _instantiate_cs(slave, fmu_tuple) + assert slave._instance is not None + slave.free_instance() + assert slave._instance is None + + +# ======================================================================= +# Co-Simulation lifecycle +# ======================================================================= +@pytest.mark.parametrize( + "fmu_tuple", + [BOUNCING_BALL, DAHLQUIST, FEEDTHROUGH, STAIR, RESOURCE, VAN_DER_POL, STATE_SPACE], + ids=[ + "BouncingBall", + "Dahlquist", + "Feedthrough", + "Stair", + "Resource", + "VanDerPol", + "StateSpace", + ], +) +def test_fmi3_cs_lifecycle(fmu_tuple, reference_fmus_dir): + """Full CS lifecycle: instantiate → init → step → terminate → free.""" + slave = _make_slave(reference_fmus_dir, fmu_tuple) + _instantiate_cs(slave, fmu_tuple) + + status = slave.enter_initialization_mode(start_time=0.0, stop_time=1.0) + assert status == Fmi3Status.OK + + status = slave.exit_initialization_mode() + assert status == Fmi3Status.OK + + result = slave.do_step(0.0, 0.1) + assert result.status == Fmi3Status.OK + + status = slave.terminate() + assert status == Fmi3Status.OK + + slave.free_instance() + + +# ======================================================================= +# Context manager +# ======================================================================= +def test_fmi3_context_manager(reference_fmus_dir): + """Ensure context-manager exit frees the instance.""" + with _make_slave(reference_fmus_dir, BOUNCING_BALL) as slave: + assert slave.get_version() == "3.0" + _instantiate_cs(slave, BOUNCING_BALL) + assert slave._instance is not None + # After __exit__, instance should be freed + assert slave._instance is None + + +# ======================================================================= +# BouncingBall +# ======================================================================= +class TestBouncingBall: + def test_co_simulation(self, reference_fmus_dir): + """Run a BouncingBall CS simulation and check dynamics.""" + slave = _make_slave(reference_fmus_dir, BOUNCING_BALL) + _instantiate_cs(slave, BOUNCING_BALL, "bb_cs") + + slave.enter_initialization_mode(start_time=0.0, stop_time=3.0) + + vr_h = 1 + h0 = slave.get_float64([vr_h]) + assert h0[0] == pytest.approx(1.0) + + slave.exit_initialization_mode() + + # Simulate for 1 s + t = 0.0 + dt = 0.01 + for _ in range(100): + slave.do_step(t, dt) + t += dt + + h_final = slave.get_float64([vr_h]) + # Ball should have dropped and bounced + assert h_final[0] != pytest.approx(1.0, abs=0.01) + + slave.terminate() + slave.free_instance() + + def test_model_exchange(self, reference_fmus_dir): + """Run a BouncingBall ME simulation.""" + slave = _make_slave(reference_fmus_dir, BOUNCING_BALL) + _instantiate_me(slave, BOUNCING_BALL, "bb_me") + + slave.enter_initialization_mode(start_time=0.0) + slave.exit_initialization_mode() + + # Event iteration + result = slave.update_discrete_states() + while result.discrete_states_need_update: + result = slave.update_discrete_states() + + slave.enter_continuous_time_mode() + + nx = slave.get_number_of_continuous_states() + assert nx == 2 # h, v + + ni = slave.get_number_of_event_indicators() + assert ni >= 1 # At least the ground contact indicator + + states = slave.get_continuous_states(nx) + assert len(states) == nx + + derivs = slave.get_continuous_state_derivatives(nx) + assert len(derivs) == nx + assert all(isinstance(d, float) for d in derivs) + + nominals = slave.get_nominals_of_continuous_states(nx) + assert len(nominals) == nx + + slave.terminate() + slave.free_instance() + + def test_parameter_tuning(self, reference_fmus_dir): + """Set parameters g and e before initialisation.""" + slave = _make_slave(reference_fmus_dir, BOUNCING_BALL) + _instantiate_cs(slave, BOUNCING_BALL, "bb_tuned") + + slave.enter_initialization_mode(start_time=0.0, stop_time=3.0) + + vr_g, vr_e = 5, 6 + slave.set_float64([vr_g], [-9.81]) + slave.set_float64([vr_e], [0.8]) + + g_vals = slave.get_float64([vr_g]) + e_vals = slave.get_float64([vr_e]) + assert g_vals[0] == pytest.approx(-9.81) + assert e_vals[0] == pytest.approx(0.8) + + slave.exit_initialization_mode() + slave.terminate() + slave.free_instance() + + +# ======================================================================= +# Dahlquist +# ======================================================================= +class TestDahlquist: + """Dahlquist test equation: dx/dt = -k*x.""" + + def test_co_simulation(self, reference_fmus_dir): + slave = _make_slave(reference_fmus_dir, DAHLQUIST) + _instantiate_cs(slave, DAHLQUIST, "dq_cs") + + slave.enter_initialization_mode(start_time=0.0, stop_time=10.0) + + vr_x, vr_k = 1, 3 + x0 = slave.get_float64([vr_x]) + k0 = slave.get_float64([vr_k]) + assert x0[0] == pytest.approx(1.0) + assert k0[0] == pytest.approx(1.0) + + slave.exit_initialization_mode() + + t = 0.0 + dt = 0.1 + for _ in range(10): + slave.do_step(t, dt) + t += dt + + x_final = slave.get_float64([vr_x]) + assert x_final[0] != pytest.approx(1.0, abs=0.01) + + slave.terminate() + slave.free_instance() + + def test_model_exchange(self, reference_fmus_dir): + slave = _make_slave(reference_fmus_dir, DAHLQUIST) + _instantiate_me(slave, DAHLQUIST, "dq_me") + + slave.enter_initialization_mode(start_time=0.0) + slave.exit_initialization_mode() + + result = slave.update_discrete_states() + while result.discrete_states_need_update: + result = slave.update_discrete_states() + + slave.enter_continuous_time_mode() + + nx = 1 + states = slave.get_continuous_states(nx) + assert states[0] == pytest.approx(1.0) + + slave.set_time(0.0) + derivs = slave.get_continuous_state_derivatives(nx) + assert len(derivs) == nx + + # Euler step + new_state = [states[0] + 0.1 * derivs[0]] + slave.set_continuous_states(new_state) + slave.set_time(0.1) + + enter_event, terminate = slave.completed_integrator_step() + assert isinstance(enter_event, bool) + assert isinstance(terminate, bool) + + slave.terminate() + slave.free_instance() + + def test_fmu_state_get_set(self, reference_fmus_dir): + """Save / restore FMU state.""" + slave = _make_slave(reference_fmus_dir, DAHLQUIST) + _instantiate_cs(slave, DAHLQUIST, "dq_state") + + slave.enter_initialization_mode(start_time=0.0, stop_time=10.0) + slave.exit_initialization_mode() + + slave.do_step(0.0, 0.1) + slave.do_step(0.1, 0.1) + + vr_x = 1 + x_before = slave.get_float64([vr_x])[0] + + state = slave.get_fmu_state() + assert state is not None + + slave.do_step(0.2, 0.1) + slave.do_step(0.3, 0.1) + x_after = slave.get_float64([vr_x])[0] + assert x_after != pytest.approx(x_before, abs=1e-12) + + slave.set_fmu_state(state) + x_restored = slave.get_float64([vr_x])[0] + assert x_restored == pytest.approx(x_before, abs=1e-10) + + slave.free_fmu_state(state) + slave.terminate() + slave.free_instance() + + def test_state_serialization(self, reference_fmus_dir): + """Serialize / deserialize FMU state.""" + slave = _make_slave(reference_fmus_dir, DAHLQUIST) + _instantiate_cs(slave, DAHLQUIST, "dq_ser") + + slave.enter_initialization_mode(start_time=0.0, stop_time=10.0) + slave.exit_initialization_mode() + + slave.do_step(0.0, 0.1) + slave.do_step(0.1, 0.1) + + vr_x = 1 + x_original = slave.get_float64([vr_x])[0] + + state = slave.get_fmu_state() + serialized = slave.serialize_fmu_state(state) + assert isinstance(serialized, bytes) + assert len(serialized) > 0 + slave.free_fmu_state(state) + + # Advance further + slave.do_step(0.2, 0.1) + x_changed = slave.get_float64([vr_x])[0] + assert x_changed != pytest.approx(x_original, abs=1e-12) + + # Restore via deserialization + restored = slave.deserialize_fmu_state(serialized) + slave.set_fmu_state(restored) + x_restored = slave.get_float64([vr_x])[0] + assert x_restored == pytest.approx(x_original, abs=1e-10) + + slave.free_fmu_state(restored) + slave.terminate() + slave.free_instance() + + def test_multiple_instances(self, reference_fmus_dir): + """Run two Dahlquist instances with different k in parallel.""" + slave1 = _make_slave(reference_fmus_dir, DAHLQUIST) + _instantiate_cs(slave1, DAHLQUIST, "dq1") + + slave2 = _make_slave(reference_fmus_dir, DAHLQUIST) + _instantiate_cs(slave2, DAHLQUIST, "dq2") + + vr_x, vr_k = 1, 3 + + # slave1: default k=1 + slave1.enter_initialization_mode(start_time=0.0, stop_time=10.0) + slave1.exit_initialization_mode() + + # slave2: k=2 + slave2.enter_initialization_mode(start_time=0.0, stop_time=10.0) + slave2.set_float64([vr_k], [2.0]) + slave2.exit_initialization_mode() + + for i in range(5): + slave1.do_step(i * 0.1, 0.1) + slave2.do_step(i * 0.1, 0.1) + + x1 = slave1.get_float64([vr_x])[0] + x2 = slave2.get_float64([vr_x])[0] + # Different k → different trajectory + assert x1 != pytest.approx(x2, abs=1e-6) + + slave1.terminate() + slave1.free_instance() + slave2.terminate() + slave2.free_instance() + + +# ======================================================================= +# Feedthrough – all FMI 3.0 data types +# ======================================================================= +class TestFeedthrough: + """Feedthrough passes inputs directly to outputs. + + Covers Float32, Float64, Int8/16/32/64, UInt8/16/32/64, + Boolean, String, and Binary types. + """ + + def _setup(self, reference_fmus_dir): + slave = _make_slave(reference_fmus_dir, FEEDTHROUGH) + _instantiate_cs(slave, FEEDTHROUGH, "ft") + slave.enter_initialization_mode(start_time=0.0, stop_time=2.0) + return slave + + def test_float32(self, reference_fmus_dir): + slave = self._setup(reference_fmus_dir) + vr_in, vr_out = 1, 2 # Float32 continuous + slave.set_float32([vr_in], [3.14]) + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + out = slave.get_float32([vr_out]) + assert out[0] == pytest.approx(3.14, abs=0.01) + slave.terminate() + slave.free_instance() + + def test_float64(self, reference_fmus_dir): + slave = self._setup(reference_fmus_dir) + vr_in, vr_out = 7, 8 # Float64 continuous + slave.set_float64([vr_in], [2.71828]) + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + out = slave.get_float64([vr_out]) + assert out[0] == pytest.approx(2.71828, abs=1e-4) + slave.terminate() + slave.free_instance() + + def test_int8(self, reference_fmus_dir): + slave = self._setup(reference_fmus_dir) + vr_in, vr_out = 11, 12 + slave.set_int8([vr_in], [42]) + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + out = slave.get_int8([vr_out]) + assert out[0] == 42 + slave.terminate() + slave.free_instance() + + def test_uint8(self, reference_fmus_dir): + slave = self._setup(reference_fmus_dir) + vr_in, vr_out = 13, 14 + slave.set_uint8([vr_in], [200]) + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + out = slave.get_uint8([vr_out]) + assert out[0] == 200 + slave.terminate() + slave.free_instance() + + def test_int16(self, reference_fmus_dir): + slave = self._setup(reference_fmus_dir) + vr_in, vr_out = 15, 16 + slave.set_int16([vr_in], [-1234]) + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + out = slave.get_int16([vr_out]) + assert out[0] == -1234 + slave.terminate() + slave.free_instance() + + def test_uint16(self, reference_fmus_dir): + slave = self._setup(reference_fmus_dir) + vr_in, vr_out = 17, 18 + slave.set_uint16([vr_in], [50000]) + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + out = slave.get_uint16([vr_out]) + assert out[0] == 50000 + slave.terminate() + slave.free_instance() + + def test_int32(self, reference_fmus_dir): + slave = self._setup(reference_fmus_dir) + vr_in, vr_out = 19, 20 + slave.set_int32([vr_in], [42]) + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + out = slave.get_int32([vr_out]) + assert out[0] == 42 + slave.terminate() + slave.free_instance() + + def test_uint32(self, reference_fmus_dir): + slave = self._setup(reference_fmus_dir) + vr_in, vr_out = 21, 22 + slave.set_uint32([vr_in], [123456]) + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + out = slave.get_uint32([vr_out]) + assert out[0] == 123456 + slave.terminate() + slave.free_instance() + + def test_int64(self, reference_fmus_dir): + slave = self._setup(reference_fmus_dir) + vr_in, vr_out = 23, 24 + slave.set_int64([vr_in], [-999999]) + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + out = slave.get_int64([vr_out]) + assert out[0] == -999999 + slave.terminate() + slave.free_instance() + + def test_uint64(self, reference_fmus_dir): + slave = self._setup(reference_fmus_dir) + vr_in, vr_out = 25, 26 + slave.set_uint64([vr_in], [18446744073709]) + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + out = slave.get_uint64([vr_out]) + assert out[0] == 18446744073709 + slave.terminate() + slave.free_instance() + + def test_boolean(self, reference_fmus_dir): + slave = self._setup(reference_fmus_dir) + vr_in, vr_out = 27, 28 + slave.set_boolean([vr_in], [True]) + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + out = slave.get_boolean([vr_out]) + assert out[0] is True + slave.terminate() + slave.free_instance() + + def test_string(self, reference_fmus_dir): + slave = self._setup(reference_fmus_dir) + vr_in, vr_out = 29, 30 + slave.set_string([vr_in], ["Hello FMI3"]) + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + out = slave.get_string([vr_out]) + assert out[0] == "Hello FMI3" + slave.terminate() + slave.free_instance() + + def test_binary(self, reference_fmus_dir): + slave = self._setup(reference_fmus_dir) + vr_in, vr_out = 31, 32 + payload = b"\xde\xad\xbe\xef" + slave.set_binary([vr_in], [payload]) + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + out = slave.get_binary([vr_out]) + assert out[0] == payload + slave.terminate() + slave.free_instance() + + def test_all_types_combined(self, reference_fmus_dir): + """Set one of each type and verify after a step.""" + slave = self._setup(reference_fmus_dir) + + slave.set_float32([1], [1.5]) + slave.set_float64([7], [2.5]) + slave.set_int8([11], [10]) + slave.set_uint8([13], [20]) + slave.set_int16([15], [300]) + slave.set_uint16([17], [400]) + slave.set_int32([19], [500]) + slave.set_uint32([21], [600]) + slave.set_int64([23], [700]) + slave.set_uint64([25], [800]) + slave.set_boolean([27], [True]) + slave.set_string([29], ["combo"]) + slave.set_binary([31], [b"\x01\x02"]) + + slave.exit_initialization_mode() + slave.do_step(0.0, 0.1) + + assert slave.get_float32([2])[0] == pytest.approx(1.5, abs=0.01) + assert slave.get_float64([8])[0] == pytest.approx(2.5, abs=1e-6) + assert slave.get_int8([12])[0] == 10 + assert slave.get_uint8([14])[0] == 20 + assert slave.get_int16([16])[0] == 300 + assert slave.get_uint16([18])[0] == 400 + assert slave.get_int32([20])[0] == 500 + assert slave.get_uint32([22])[0] == 600 + assert slave.get_int64([24])[0] == 700 + assert slave.get_uint64([26])[0] == 800 + assert slave.get_boolean([28])[0] is True + assert slave.get_string([30])[0] == "combo" + assert slave.get_binary([32])[0] == b"\x01\x02" + + slave.terminate() + slave.free_instance() + + +# ======================================================================= +# Stair – discrete time events +# ======================================================================= +class TestStair: + def test_counter_increments(self, reference_fmus_dir): + """Stair counter should increment over time.""" + slave = _make_slave(reference_fmus_dir, STAIR) + _instantiate_cs(slave, STAIR, "stair") + + slave.enter_initialization_mode(start_time=0.0, stop_time=10.0) + + vr_counter = 1 + initial = slave.get_int32([vr_counter]) + assert initial[0] == 1 + + slave.exit_initialization_mode() + + t = 0.0 + dt = 0.2 + for _ in range(25): # 5 seconds worth + slave.do_step(t, dt) + t += dt + + counter = slave.get_int32([vr_counter])[0] + assert counter > 1 + assert counter <= 10 + + slave.terminate() + slave.free_instance() + + def test_model_exchange(self, reference_fmus_dir): + slave = _make_slave(reference_fmus_dir, STAIR) + _instantiate_me(slave, STAIR, "stair_me") + + slave.enter_initialization_mode(start_time=0.0) + slave.exit_initialization_mode() + + result = slave.update_discrete_states() + assert isinstance(result.discrete_states_need_update, bool) + assert isinstance(result.next_event_time_defined, bool) + + slave.terminate() + slave.free_instance() + + +# ======================================================================= +# Resource – reads from the resources/ directory +# ======================================================================= +class TestResource: + def test_resource_file_loading(self, reference_fmus_dir): + slave = _make_slave(reference_fmus_dir, RESOURCE) + _instantiate_cs(slave, RESOURCE, "res") + + slave.enter_initialization_mode(start_time=0.0, stop_time=1.0) + slave.exit_initialization_mode() + + vr_y = 1 + y = slave.get_int32([vr_y]) + assert len(y) == 1 + assert isinstance(y[0], int) + + slave.terminate() + slave.free_instance() + + +# ======================================================================= +# VanDerPol +# ======================================================================= +class TestVanDerPol: + def test_co_simulation(self, reference_fmus_dir): + slave = _make_slave(reference_fmus_dir, VAN_DER_POL) + _instantiate_cs(slave, VAN_DER_POL, "vdp_cs") + + slave.enter_initialization_mode(start_time=0.0, stop_time=10.0) + + # Initial values: x0=2, x1=0 + assert slave.get_float64([1])[0] == pytest.approx(2.0) + assert slave.get_float64([3])[0] == pytest.approx(0.0) + + slave.exit_initialization_mode() + + t = 0.0 + dt = 0.01 + for _ in range(100): + slave.do_step(t, dt) + t += dt + + # Oscillation should have started + x0 = slave.get_float64([1])[0] + x1 = slave.get_float64([3])[0] + assert not ( + x0 == pytest.approx(2.0, abs=0.01) and x1 == pytest.approx(0.0, abs=0.01) + ) + + slave.terminate() + slave.free_instance() + + def test_model_exchange(self, reference_fmus_dir): + slave = _make_slave(reference_fmus_dir, VAN_DER_POL) + _instantiate_me(slave, VAN_DER_POL, "vdp_me") + + slave.enter_initialization_mode(start_time=0.0) + slave.exit_initialization_mode() + + result = slave.update_discrete_states() + while result.discrete_states_need_update: + result = slave.update_discrete_states() + + slave.enter_continuous_time_mode() + + nx = slave.get_number_of_continuous_states() + assert nx == 2 + + states = slave.get_continuous_states(nx) + assert states[0] == pytest.approx(2.0) + assert states[1] == pytest.approx(0.0) + + slave.set_time(0.0) + derivs = slave.get_continuous_state_derivatives(nx) + assert len(derivs) == 2 + + slave.terminate() + slave.free_instance() + + def test_directional_derivative(self, reference_fmus_dir): + """VanDerPol provides directional derivatives.""" + slave = _make_slave(reference_fmus_dir, VAN_DER_POL) + _instantiate_me(slave, VAN_DER_POL, "vdp_dd") + + slave.enter_initialization_mode(start_time=0.0) + slave.exit_initialization_mode() + + result = slave.update_discrete_states() + while result.discrete_states_need_update: + result = slave.update_discrete_states() + + slave.enter_continuous_time_mode() + slave.set_time(0.0) + + # Directional derivative: ∂der(x)/∂x with seed=[1,0] + # unknowns = der(x0), der(x1) → VR 2, 4 + # knowns = x0, x1 → VR 1, 3 + sensitivity = slave.get_directional_derivative( + unknowns=[2, 4], + knowns=[1, 3], + seed=[1.0, 0.0], + ) + assert len(sensitivity) == 2 + assert all(isinstance(s, float) for s in sensitivity) + + slave.terminate() + slave.free_instance() + + def test_adjoint_derivative(self, reference_fmus_dir): + """VanDerPol provides adjoint derivatives.""" + slave = _make_slave(reference_fmus_dir, VAN_DER_POL) + _instantiate_me(slave, VAN_DER_POL, "vdp_ad") + + slave.enter_initialization_mode(start_time=0.0) + slave.exit_initialization_mode() + + result = slave.update_discrete_states() + while result.discrete_states_need_update: + result = slave.update_discrete_states() + + slave.enter_continuous_time_mode() + slave.set_time(0.0) + + # Adjoint derivative: unknowns VR 2,4; knowns VR 1,3 + sensitivity = slave.get_adjoint_derivative( + unknowns=[2, 4], + knowns=[1, 3], + seed=[1.0, 0.0], + ) + assert len(sensitivity) == 2 + assert all(isinstance(s, float) for s in sensitivity) + + slave.terminate() + slave.free_instance() + + +# ======================================================================= +# StateSpace – array variables +# ======================================================================= +class TestStateSpace: + def test_co_simulation(self, reference_fmus_dir): + """StateSpace CS: set u, step, read y.""" + slave = _make_slave(reference_fmus_dir, STATE_SPACE) + _instantiate_cs(slave, STATE_SPACE, "ss_cs") + + slave.enter_initialization_mode(start_time=0.0, stop_time=1.0) + + # u (VR 9) is an array with 3 elements, set via Float64 + # Array VRs in FMI 3.0 are accessed by repeating the single VR + # but the stride is handled by nValues parameter. + # For Reference-FMUs implementation, VR 9 covers 3 values. + slave.exit_initialization_mode() + + result = slave.do_step(0.0, 0.1) + assert result.status == Fmi3Status.OK + + slave.terminate() + slave.free_instance() + + def test_model_exchange(self, reference_fmus_dir): + slave = _make_slave(reference_fmus_dir, STATE_SPACE) + _instantiate_me(slave, STATE_SPACE, "ss_me") + + slave.enter_initialization_mode(start_time=0.0) + slave.exit_initialization_mode() + + result = slave.update_discrete_states() + while result.discrete_states_need_update: + result = slave.update_discrete_states() + + slave.enter_continuous_time_mode() + + nx = slave.get_number_of_continuous_states() + assert nx == 3 # n=3 by default + + states = slave.get_continuous_states(nx) + assert len(states) == nx + + derivs = slave.get_continuous_state_derivatives(nx) + assert len(derivs) == nx + + slave.terminate() + slave.free_instance() + + +# ======================================================================= +# Clocks – Scheduled Execution +# ======================================================================= +class TestClocks: + def test_instantiate_scheduled_execution(self, reference_fmus_dir): + """Clocks is SE-only – verify instantiation and activation.""" + slave = _make_slave(reference_fmus_dir, CLOCKS) + _, _, token = CLOCKS + slave.instantiate_scheduled_execution( + "clocks_se", + instantiation_token=token, + ) + assert slave._instance is not None + + slave.enter_initialization_mode(start_time=0.0) + + # Read initial tick counts (should be 0) + ticks = slave.get_int32([2001]) # inClock1Ticks + assert isinstance(ticks[0], int) + + slave.exit_initialization_mode() + + # Activate a model partition for inClock1 + status = slave.activate_model_partition( + clock_reference=1001, + activation_time=0.0, + ) + assert status == Fmi3Status.OK + + # After activation, tick count should have incremented + ticks_after = slave.get_int32([2001]) + assert isinstance(ticks_after[0], int) + + slave.terminate() + slave.free_instance() + + +# ======================================================================= +# Reset +# ======================================================================= +@pytest.mark.parametrize( + "fmu_tuple", + [BOUNCING_BALL, DAHLQUIST, FEEDTHROUGH, STAIR, VAN_DER_POL], + ids=["BouncingBall", "Dahlquist", "Feedthrough", "Stair", "VanDerPol"], +) +def test_fmi3_reset(fmu_tuple, reference_fmus_dir): + """Reset should return the FMU to a fresh state.""" + slave = _make_slave(reference_fmus_dir, fmu_tuple) + _instantiate_cs(slave, fmu_tuple) + + slave.enter_initialization_mode(start_time=0.0, stop_time=10.0) + slave.exit_initialization_mode() + + slave.do_step(0.0, 0.1) + slave.do_step(0.1, 0.1) + + status = slave.reset() + assert status == Fmi3Status.OK + + slave.free_instance() + + +# ======================================================================= +# Debug logging +# ======================================================================= +def test_fmi3_set_debug_logging(reference_fmus_dir): + slave = _make_slave(reference_fmus_dir, BOUNCING_BALL) + _instantiate_cs(slave, BOUNCING_BALL, "bb_log", logging_on=True) + + status = slave.set_debug_logging(True, categories=["logEvents"]) + assert status == Fmi3Status.OK + + status = slave.set_debug_logging(False) + assert status == Fmi3Status.OK + + slave.free_instance() + + +# ======================================================================= +# Error handling +# ======================================================================= +def test_fmi3_bad_token(reference_fmus_dir): + """Instantiation with a wrong token should raise or return NULL.""" + slave = _make_slave(reference_fmus_dir, BOUNCING_BALL) + with pytest.raises(RuntimeError): + slave.instantiate_co_simulation( + "bad", + instantiation_token="{00000000-0000-0000-0000-000000000000}", + ) + + +# ======================================================================= +# DoStep output fields +# ======================================================================= +def test_fmi3_do_step_result_fields(reference_fmus_dir): + """Verify all DoStepResult fields are populated.""" + slave = _make_slave(reference_fmus_dir, BOUNCING_BALL) + _instantiate_cs(slave, BOUNCING_BALL) + + slave.enter_initialization_mode(start_time=0.0, stop_time=10.0) + slave.exit_initialization_mode() + + result = slave.do_step(0.0, 0.1) + + assert isinstance(result.status, Fmi3Status) + assert isinstance(result.event_handling_needed, bool) + assert isinstance(result.terminate_simulation, bool) + assert isinstance(result.early_return, bool) + assert isinstance(result.last_successful_time, float) + + slave.terminate() + slave.free_instance() + + +# ======================================================================= +# UpdateDiscreteStates output fields +# ======================================================================= +def test_fmi3_update_discrete_states_fields(reference_fmus_dir): + """Verify all UpdateDiscreteStatesResult fields.""" + slave = _make_slave(reference_fmus_dir, BOUNCING_BALL) + _instantiate_me(slave, BOUNCING_BALL) + + slave.enter_initialization_mode(start_time=0.0) + slave.exit_initialization_mode() + + result = slave.update_discrete_states() + + assert isinstance(result.discrete_states_need_update, bool) + assert isinstance(result.terminate_simulation, bool) + assert isinstance(result.nominals_of_continuous_states_changed, bool) + assert isinstance(result.values_of_continuous_states_changed, bool) + assert isinstance(result.next_event_time_defined, bool) + assert isinstance(result.next_event_time, float) + + slave.terminate() + slave.free_instance() + + +# ======================================================================= +# Tolerance in enter_initialization_mode +# ======================================================================= +def test_fmi3_init_with_tolerance(reference_fmus_dir): + slave = _make_slave(reference_fmus_dir, BOUNCING_BALL) + _instantiate_cs(slave, BOUNCING_BALL) + + status = slave.enter_initialization_mode( + start_time=0.0, + stop_time=10.0, + tolerance=1e-6, + ) + assert status == Fmi3Status.OK + + status = slave.exit_initialization_mode() + assert status == Fmi3Status.OK + + slave.terminate() + slave.free_instance() + + +# ======================================================================= +# Event indicators (ME) +# ======================================================================= +def test_fmi3_event_indicators(reference_fmus_dir): + slave = _make_slave(reference_fmus_dir, BOUNCING_BALL) + _instantiate_me(slave, BOUNCING_BALL) + + slave.enter_initialization_mode(start_time=0.0) + slave.exit_initialization_mode() + + result = slave.update_discrete_states() + while result.discrete_states_need_update: + result = slave.update_discrete_states() + + slave.enter_continuous_time_mode() + + ni = slave.get_number_of_event_indicators() + assert ni >= 1 + + indicators = slave.get_event_indicators(ni) + assert len(indicators) == ni + assert all(isinstance(v, float) for v in indicators) + + slave.terminate() + slave.free_instance()