diff --git a/OMPython/ModelicaSystem.py b/OMPython/ModelicaSystem.py index 9db3da33..7ec6d072 100644 --- a/OMPython/ModelicaSystem.py +++ b/OMPython/ModelicaSystem.py @@ -13,7 +13,7 @@ import queue import textwrap import threading -from typing import Any, cast, Optional +from typing import Any, cast, Optional, Union import warnings import xml.etree.ElementTree as ET @@ -21,6 +21,7 @@ import re +from OMPython.OMBase import ModelicaSystemBase, ModelicaSystemError from OMPython.OMCSession import ( OMCSessionException, OMCSessionRunData, @@ -33,12 +34,6 @@ logger = logging.getLogger(__name__) -class ModelicaSystemError(Exception): - """ - Exception used in ModelicaSystem and ModelicaSystemCmd classes. - """ - - @dataclass class LinearizationResult: """Modelica model linearization results. @@ -112,8 +107,7 @@ def __init__( # dictionaries of command line arguments for the model executable self._args: dict[str, str | None] = {} - # 'override' argument needs special handling, as it is a dict on its own saved as dict elements following the - # structure: 'key' => 'key=value' + # 'override' argument needs special handling self._arg_override: dict[str, str] = {} def arg_set( @@ -137,7 +131,6 @@ def override2str( """ Convert a value for 'override' to a string taking into account differences between Modelica and Python. """ - # check oval for any string representations of numbers (or bool) and convert these to Python representations if isinstance(oval, str): try: oval_evaluated = ast.literal_eval(oval) @@ -295,7 +288,7 @@ def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, Any] | n return simargs -class ModelicaSystem: +class ModelicaSystem(ModelicaSystemBase): """ Class to simulate a Modelica model using OpenModelica via OMCSession. """ @@ -321,26 +314,9 @@ def __init__( unspecified, a new local session will be created. """ - self._quantities: list[dict[str, Any]] = [] - self._params: dict[str, str] = {} # even numerical values are stored as str - self._inputs: dict[str, list[tuple[float, float]]] = {} - # _outputs values are str before simulate(), but they can be - # np.float64 after simulate(). - self._outputs: dict[str, Any] = {} - # same for _continuous - self._continuous: dict[str, Any] = {} - self._simulate_options: dict[str, str] = {} - self._override_variables: dict[str, str] = {} - self._simulate_options_override: dict[str, str] = {} - self._linearization_options: dict[str, str | float] = { - 'startTime': 0.0, - 'stopTime': 1.0, - 'stepSize': 0.002, - 'tolerance': 1e-8, - } - self._optimization_options = self._linearization_options | { - 'numberOfIntervals': 500, - } + # Initialize base class + super().__init__() + self._linearized_inputs: list[str] = [] # linearization input list self._linearized_outputs: list[str] = [] # linearization output list self._linearized_states: list[str] = [] # linearization states list @@ -590,113 +566,18 @@ def _xmlparse(self, xml_file: OMCPath): if not xml_file.is_file(): raise ModelicaSystemError(f"XML file not generated: {xml_file}") + # ModelicaSystem uses OMCPath to read text, then fromstring xml_content = xml_file.read_text() tree = ET.ElementTree(ET.fromstring(xml_content)) root = tree.getroot() - for attr in root.iter('DefaultExperiment'): - for key in ("startTime", "stopTime", "stepSize", "tolerance", - "solver", "outputFormat"): - self._simulate_options[key] = str(attr.get(key)) - - for sv in root.iter('ScalarVariable'): - translations = { - "alias": "alias", - "aliasvariable": "aliasVariable", - "causality": "causality", - "changeable": "isValueChangeable", - "description": "description", - "name": "name", - "variability": "variability", - } - - scalar: dict[str, Any] = {} - for key_dst, key_src in translations.items(): - val = sv.get(key_src) - scalar[key_dst] = None if val is None else str(val) - - ch = list(sv) - for att in ch: - scalar["start"] = att.get('start') - scalar["min"] = att.get('min') - scalar["max"] = att.get('max') - scalar["unit"] = att.get('unit') - - # save parameters in the corresponding class variables - if scalar["variability"] == "parameter": - if scalar["name"] in self._override_variables: - self._params[scalar["name"]] = self._override_variables[scalar["name"]] - else: - self._params[scalar["name"]] = scalar["start"] - if scalar["variability"] == "continuous": - self._continuous[scalar["name"]] = scalar["start"] - if scalar["causality"] == "input": - self._inputs[scalar["name"]] = scalar["start"] - if scalar["causality"] == "output": - self._outputs[scalar["name"]] = scalar["start"] - - self._quantities.append(scalar) - - def getQuantities(self, names: Optional[str | list[str]] = None) -> list[dict]: - """ - This method returns list of dictionaries. It displays details of - quantities such as name, value, changeable, and description. - - Examples: - >>> mod.getQuantities() - [ - { - 'alias': 'noAlias', - 'aliasvariable': None, - 'causality': 'local', - 'changeable': 'true', - 'description': None, - 'max': None, - 'min': None, - 'name': 'x', - 'start': '1.0', - 'unit': None, - 'variability': 'continuous', - }, - { - 'name': 'der(x)', - # ... - }, - # ... - ] - - >>> getQuantities("y") - [{ - 'name': 'y', # ... - }] - - >>> getQuantities(["y","x"]) - [ - { - 'name': 'y', # ... - }, - { - 'name': 'x', # ... - } - ] - """ - if names is None: - return self._quantities - if isinstance(names, str): - r = [x for x in self._quantities if x["name"] == names] - if r == []: - raise KeyError(names) - return r - - if isinstance(names, list): - return [x for y in names for x in self._quantities if x["name"] == y] - - raise ModelicaSystemError("Unhandled input for getQuantities()") + # Delegate population to base class + self._populate_from_xml_root(root) def getContinuous( self, - names: Optional[str | list[str]] = None, - ) -> dict[str, str | numbers.Real] | list[str | numbers.Real]: + names: Optional[Union[str, list[str]]] = None, + ) -> Union[dict[str, Union[str, numbers.Real]], list[Union[str, numbers.Real]]]: """Get values of continuous signals. If called before simulate(), the initial values are returned as @@ -732,12 +613,7 @@ def getContinuous( [np.float64(-0.24), np.float64(0.68)] """ if not self._simulated: - if names is None: - return self._continuous - if isinstance(names, str): - return [self._continuous[names]] - if isinstance(names, list): - return [self._continuous[x] for x in names] + return super().getContinuous(names) if names is None: for name in self._continuous: @@ -768,81 +644,10 @@ def getContinuous( raise ModelicaSystemError("Unhandled input for getContinous()") - def getParameters( - self, - names: Optional[str | list[str]] = None, - ) -> dict[str, str] | list[str]: - """Get parameter values. - - Args: - names: Either None (default), a string with the parameter name, - or a list of parameter name strings. - Returns: - If `names` is None, a dict in the format - {parameter_name: parameter_value} is returned. - If `names` is a string, a single element list is returned. - If `names` is a list, a list with one value for each parameter name - in names is returned. - In all cases, parameter values are returned as strings. - - Examples: - >>> mod.getParameters() - {'Name1': '1.23', 'Name2': '4.56'} - >>> mod.getParameters("Name1") - ['1.23'] - >>> mod.getParameters(["Name1","Name2"]) - ['1.23', '4.56'] - """ - if names is None: - return self._params - if isinstance(names, str): - return [self._params[names]] - if isinstance(names, list): - return [self._params[x] for x in names] - - raise ModelicaSystemError("Unhandled input for getParameters()") - - def getInputs( - self, - names: Optional[str | list[str]] = None, - ) -> dict[str, list[tuple[float, float]]] | list[list[tuple[float, float]]]: - """Get values of input signals. - - Args: - names: Either None (default), a string with the input name, - or a list of input name strings. - Returns: - If `names` is None, a dict in the format - {input_name: input_value} is returned. - If `names` is a string, a single element list [input_value] is - returned. - If `names` is a list, a list with one value for each input name - in names is returned: [input1_values, input2_values, ...]. - In all cases, input values are returned as a list of tuples, - where the first element in the tuple is the time and the second - element is the input value. - - Examples: - >>> mod.getInputs() - {'Name1': [(0.0, 0.0), (1.0, 1.0)], 'Name2': None} - >>> mod.getInputs("Name1") - [[(0.0, 0.0), (1.0, 1.0)]] - >>> mod.getInputs(["Name1","Name2"]) - [[(0.0, 0.0), (1.0, 1.0)], None] - """ - if names is None: - return self._inputs - if isinstance(names, str): - return [self._inputs[names]] - if isinstance(names, list): - return [self._inputs[x] for x in names] - - raise ModelicaSystemError("Unhandled input for getInputs()") - def getOutputs( self, - names: Optional[str | list[str]] = None, - ) -> dict[str, str | numbers.Real] | list[str | numbers.Real]: + names: Optional[Union[str, list[str]]] = None, + ) -> Union[dict[str, Union[str, numbers.Real]], list[Union[str, numbers.Real]]]: """Get values of output signals. If called before simulate(), the initial values are returned as @@ -878,11 +683,7 @@ def getOutputs( [np.float64(-0.1234), np.float64(2.1)] """ if not self._simulated: - if names is None: - return self._outputs - if isinstance(names, str): - return [self._outputs[names]] - return [self._outputs[x] for x in names] + return super().getOutputs(names) if names is None: for name in self._outputs: @@ -910,47 +711,10 @@ def getOutputs( raise ModelicaSystemError("Unhandled input for getOutputs()") - def getSimulationOptions( - self, - names: Optional[str | list[str]] = None, - ) -> dict[str, str] | list[str]: - """Get simulation options such as stopTime and tolerance. - - Args: - names: Either None (default), a string with the simulation option - name, or a list of option name strings. - - Returns: - If `names` is None, a dict in the format - {option_name: option_value} is returned. - If `names` is a string, a single element list [option_value] is - returned. - If `names` is a list, a list with one value for each option name - in names is returned: [option1_value, option2_value, ...]. - Option values are always returned as strings. - - Examples: - >>> mod.getSimulationOptions() - {'startTime': '0', 'stopTime': '1.234', - 'stepSize': '0.002', 'tolerance': '1.1e-08', 'solver': 'dassl', 'outputFormat': 'mat'} - >>> mod.getSimulationOptions("stopTime") - ['1.234'] - >>> mod.getSimulationOptions(["tolerance", "stopTime"]) - ['1.1e-08', '1.234'] - """ - if names is None: - return self._simulate_options - if isinstance(names, str): - return [self._simulate_options[names]] - if isinstance(names, list): - return [self._simulate_options[x] for x in names] - - raise ModelicaSystemError("Unhandled input for getSimulationOptions()") - def getLinearizationOptions( self, - names: Optional[str | list[str]] = None, - ) -> dict[str, str | float] | list[str | float]: + names: Optional[Union[str, list[str]]] = None, + ) -> Union[dict[str, Union[str, float]], list[Union[str, float]]]: """Get simulation options used for linearization. Args: @@ -985,44 +749,6 @@ def getLinearizationOptions( raise ModelicaSystemError("Unhandled input for getLinearizationOptions()") - def getOptimizationOptions( - self, - names: Optional[str | list[str]] = None, - ) -> dict[str, str | float] | list[str | float]: - """Get simulation options used for optimization. - - Args: - names: Either None (default), a string with the optimization option - name, or a list of option name strings. - - Returns: - If `names` is None, a dict in the format - {option_name: option_value} is returned. - If `names` is a string, a single element list [option_value] is - returned. - If `names` is a list, a list with one value for each option name - in names is returned: [option1_value, option2_value, ...]. - Some option values are returned as float when first initialized, - but always as strings after setOptimizationOptions is used to - change them. - - Examples: - >>> mod.getOptimizationOptions() - {'startTime': 0.0, 'stopTime': 1.0, 'numberOfIntervals': 500, 'stepSize': 0.002, 'tolerance': 1e-08} - >>> mod.getOptimizationOptions("stopTime") - [1.0] - >>> mod.getOptimizationOptions(["tolerance", "stopTime"]) - [1e-08, 1.0] - """ - if names is None: - return self._optimization_options - if isinstance(names, str): - return [self._optimization_options[names]] - if isinstance(names, list): - return [self._optimization_options[x] for x in names] - - raise ModelicaSystemError("Unhandled input for getOptimizationOptions()") - def parse_om_version(self, version: str) -> tuple[int, int, int]: match = re.search(r"v?(\d+)\.(\d+)\.(\d+)", version) if not match: @@ -1056,7 +782,6 @@ def simulate_cmd( ------- An instance if ModelicaSystemCmd to run the requested simulation. """ - om_cmd = ModelicaSystemCmd( session=self._session, runpath=self.getWorkDirectory(), @@ -1141,7 +866,6 @@ def simulate( # using simargs mod.simulate(simargs={"noEventEmit": None, "noRestart": None, "override": "override": {"e": 0.3, "g": 10}}) """ - if resultfile is None: # default result file generated by OM self._result_file = self.getWorkDirectory() / f"{self._model_name}_res.mat" @@ -1189,7 +913,6 @@ def plot( Plot a variable using OMC; this will work for local OMC usage only (OMCProcessLocal). The reason is that the plot is created by OMC which needs access to the local display. This is not the case for docker and WSL. """ - if not isinstance(self._session, OMCSessionLocal): raise ModelicaSystemError("Plot is using the OMC plot functionality; " "thus, it is only working if OMC is running locally!") @@ -1281,199 +1004,6 @@ def getSolutions( self.sendExpression("closeSimulationResultFile()") return np_res - @staticmethod - def _prepare_input_data( - input_args: Any, - input_kwargs: dict[str, Any], - ) -> dict[str, str]: - """ - Convert raw input to a structured dictionary {'key1': 'value1', 'key2': 'value2'}. - """ - - def prepare_str(str_in: str) -> dict[str, str]: - str_in = str_in.replace(" ", "") - key_val_list: list[str] = str_in.split("=") - if len(key_val_list) != 2: - raise ModelicaSystemError(f"Invalid 'key=value' pair: {str_in}") - - input_data_from_str: dict[str, str] = {key_val_list[0]: key_val_list[1]} - - return input_data_from_str - - input_data: dict[str, str] = {} - - for input_arg in input_args: - if isinstance(input_arg, str): - warnings.warn(message="The definition of values to set should use a dictionary, " - "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " - "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", - category=DeprecationWarning, - stacklevel=3) - input_data = input_data | prepare_str(input_arg) - elif isinstance(input_arg, list): - warnings.warn(message="The definition of values to set should use a dictionary, " - "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " - "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", - category=DeprecationWarning, - stacklevel=3) - - for item in input_arg: - if not isinstance(item, str): - raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(item)}!") - input_data = input_data | prepare_str(item) - elif isinstance(input_arg, dict): - input_data = input_data | input_arg - else: - raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(input_arg)}!") - - if len(input_kwargs): - for key, val in input_kwargs.items(): - # ensure all values are strings to align it on one type: dict[str, str] - if not isinstance(val, str): - # spaces have to be removed as setInput() could take list of tuples as input and spaces would - # result in an error on recreating the input data - str_val = str(val).replace(' ', '') - else: - str_val = val - if ' ' in key or ' ' in str_val: - raise ModelicaSystemError(f"Spaces not allowed in key/value pairs: {repr(key)} = {repr(val)}!") - input_data[key] = str_val - - return input_data - - def _set_method_helper( - self, - inputdata: dict[str, str], - classdata: dict[str, Any], - datatype: str, - overridedata: Optional[dict[str, str]] = None, - ) -> bool: - """ - Helper function for: - * setParameter() - * setContinuous() - * setSimulationOptions() - * setLinearizationOption() - * setOptimizationOption() - * setInputs() - - Parameters - ---------- - inputdata - string or list of string given by user - classdata - dict() containing the values of different variables (eg: parameter, continuous, simulation parameters) - datatype - type identifier (eg; continuous, parameter, simulation, linearization, optimization) - overridedata - dict() which stores the new override variables list, - """ - - for key, val in inputdata.items(): - if key not in classdata: - raise ModelicaSystemError(f"Invalid variable for type {repr(datatype)}: {repr(key)}") - - if datatype == "parameter" and not self.isParameterChangeable(key): - raise ModelicaSystemError(f"It is not possible to set the parameter {repr(key)}. It seems to be " - "structural, final, protected, evaluated or has a non-constant binding. " - "Use sendExpression(...) and rebuild the model using buildModel() API; " - "command to set the parameter before rebuilding the model: " - "sendExpression(\"setParameterValue(" - f"{self._model_name}, {key}, {val if val is not None else ''}" - ")\").") - - classdata[key] = val - if overridedata is not None: - overridedata[key] = val - - return True - - def isParameterChangeable( - self, - name: str, - ) -> bool: - """ - Return if the parameter defined by name is changeable (= non-structural; can be modified without the need to - recompile the model). - """ - q = self.getQuantities(name) - if q[0]["changeable"] == "false": - return False - return True - - def setContinuous( - self, - *args: Any, - **kwargs: dict[str, Any], - ) -> bool: - """ - This method is used to set continuous values. It can be called: - with a sequence of continuous name and assigning corresponding values as arguments as show in the example below: - usage - >>> setContinuous("Name=value") # depreciated - >>> setContinuous(["Name1=value1","Name2=value2"]) # depreciated - - >>> setContinuous(Name1="value1", Name2="value2") - >>> param = {"Name1": "value1", "Name2": "value2"} - >>> setContinuous(**param) - """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) - - return self._set_method_helper( - inputdata=inputdata, - classdata=self._continuous, - datatype="continuous", - overridedata=self._override_variables) - - def setParameters( - self, - *args: Any, - **kwargs: dict[str, Any], - ) -> bool: - """ - This method is used to set parameter values. It can be called: - with a sequence of parameter name and assigning corresponding value as arguments as show in the example below: - usage - >>> setParameters("Name=value") # depreciated - >>> setParameters(["Name1=value1","Name2=value2"]) # depreciated - - >>> setParameters(Name1="value1", Name2="value2") - >>> param = {"Name1": "value1", "Name2": "value2"} - >>> setParameters(**param) - """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) - - return self._set_method_helper( - inputdata=inputdata, - classdata=self._params, - datatype="parameter", - overridedata=self._override_variables) - - def setSimulationOptions( - self, - *args: Any, - **kwargs: dict[str, Any], - ) -> bool: - """ - This method is used to set simulation options. It can be called: - with a sequence of simulation options name and assigning corresponding values as arguments as show in the - example below: - usage - >>> setSimulationOptions("Name=value") # depreciated - >>> setSimulationOptions(["Name1=value1","Name2=value2"]) # depreciated - - >>> setSimulationOptions(Name1="value1", Name2="value2") - >>> param = {"Name1": "value1", "Name2": "value2"} - >>> setSimulationOptions(**param) - """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) - - return self._set_method_helper( - inputdata=inputdata, - classdata=self._simulate_options, - datatype="simulation-option", - overridedata=self._simulate_options_override) - def setLinearizationOptions( self, *args: Any, @@ -1499,31 +1029,6 @@ def setLinearizationOptions( datatype="Linearization-option", overridedata=None) - def setOptimizationOptions( - self, - *args: Any, - **kwargs: dict[str, Any], - ) -> bool: - """ - This method is used to set optimization options. It can be called: - with a sequence of optimization options name and assigning corresponding values as arguments as show in the - example below: - usage - >>> setOptimizationOptions("Name=value") # depreciated - >>> setOptimizationOptions(["Name1=value1","Name2=value2"]) # depreciated - - >>> setOptimizationOptions(Name1="value1", Name2="value2") - >>> param = {"Name1": "value1", "Name2": "value2"} - >>> setOptimizationOptions(**param) - """ - inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) - - return self._set_method_helper( - inputdata=inputdata, - classdata=self._optimization_options, - datatype="optimization-option", - overridedata=None) - def setInputs( self, *args: Any, @@ -1655,7 +1160,6 @@ def convertMo2Fmu( includeResources=True) '/tmp/tmpmhfx9umo/CauerLowPassAnalog.fmu' """ - if fileNamePrefix is None: if self._model_name is None: fileNamePrefix = "" @@ -1686,7 +1190,6 @@ def convertFmu2Mo( usage >>> convertFmu2Mo("c:/BouncingBall.Fmu") """ - fmu_path = self._session.omcpath(fmu) if not fmu_path.is_file(): @@ -1941,9 +1444,7 @@ def run_doe(): ``` """ - - # Dictionary keys used in simulation dict (see _sim_dict or get_doe()). These dict keys contain a space and, thus, - # cannot be used as OM variable identifiers. They are defined here as reference for any evaluation of the data. + # Dictionary keys used in simulation dict DICT_ID_STRUCTURE: str = 'ID structure' DICT_ID_NON_STRUCTURE: str = 'ID non-structure' DICT_RESULT_AVAILABLE: str = 'result available' @@ -1960,7 +1461,6 @@ def __init__( omhome: Optional[str] = None, session: Optional[OMCSession] = None, # simulation specific input - # TODO: add more settings (simulation options, input options, ...) simargs: Optional[dict[str, Optional[str | dict[str, str] | numbers.Number]]] = None, # DoE specific inputs resultpath: Optional[str | os.PathLike] = None, @@ -2139,7 +1639,6 @@ def simulate( Returns True if all simulations were done successfully, else False. """ - if self._doe_cmd is None or self._doe_def is None: raise ModelicaSystemError("DoE preparation missing - call prepare() first!") @@ -2208,8 +1707,6 @@ def worker(worker_id, task_queue): resultfile = self._resultpath / resultfilename # include check for an empty (=> 0B) result file which indicates a crash of the model executable - # see: https://github.com/OpenModelica/OMPython/issues/261 - # https://github.com/OpenModelica/OpenModelica/issues/13829 if resultfile.is_file() and resultfile.size() > 0: self._doe_def[resultfilename][self.DICT_RESULT_AVAILABLE] = True doe_def_done += 1 diff --git a/OMPython/OMBase.py b/OMPython/OMBase.py new file mode 100644 index 00000000..8c056aec --- /dev/null +++ b/OMPython/OMBase.py @@ -0,0 +1,553 @@ +import xml.etree.ElementTree as ET +import logging +import numbers +import warnings +from typing import Optional, Any, Union + +# define logger using the current module name as ID +logger = logging.getLogger(__name__) + + +class ModelicaSystemError(Exception): + """ + Exception used in ModelicaSystem and ModelicaSystemCmd classes. + """ + + +class ModelicaSystemBase(object): + """ + Base class for ModelicaSystem and ModelicaSystemRunner containing common + data structures and methods for handling model quantities, parameters, and options. + """ + + def __init__(self) -> None: + self._quantities: list[dict[str, Any]] = [] + self._params: dict[str, str] = {} # even numerical values are stored as str + self._inputs: dict[str, list[tuple[float, float]]] = {} + self._outputs: dict[str, Any] = {} + self._continuous: dict[str, Any] = {} + self._simulate_options: dict[str, str] = {} + self._override_variables: dict[str, str] = {} + self._simulate_options_override: dict[str, str] = {} + self._linearization_options: dict[str, Union[str, float]] = { + 'startTime': 0.0, + 'stopTime': 1.0, + 'stepSize': 0.002, + 'tolerance': 1e-8, + } + self._optimization_options = self._linearization_options.copy() + self._optimization_options.update({'numberOfIntervals': 500}) + self._model_name: Optional[str] = None + self._variable_filter: Optional[str] = None + + def _populate_from_xml_root(self, root: ET.Element): + """ + Common XML parsing logic to populate internal data structures from the model description. + """ + for attr in root.iter('DefaultExperiment'): + for key in ("startTime", "stopTime", "stepSize", "tolerance", + "solver", "outputFormat"): + val = attr.get(key) + if val is not None: + self._simulate_options[key] = str(val) + + for sv in root.iter('ScalarVariable'): + translations = { + "alias": "alias", + "aliasvariable": "aliasVariable", + "causality": "causality", + "changeable": "isValueChangeable", + "description": "description", + "name": "name", + "variability": "variability", + } + + scalar: dict[str, Any] = {} + for key_dst, key_src in translations.items(): + val = sv.get(key_src) + scalar[key_dst] = None if val is None else str(val) + + ch = list(sv) + for att in ch: + scalar["start"] = att.get('start') + scalar["min"] = att.get('min') + scalar["max"] = att.get('max') + scalar["unit"] = att.get('unit') + + # save parameters in the corresponding class variables + if scalar["variability"] == "parameter": + if scalar["name"] in self._override_variables: + self._params[scalar["name"]] = self._override_variables[scalar["name"]] + else: + self._params[scalar["name"]] = scalar["start"] + if scalar["variability"] == "continuous": + self._continuous[scalar["name"]] = scalar["start"] + if scalar["causality"] == "input": + self._inputs[scalar["name"]] = scalar["start"] + if scalar["causality"] == "output": + self._outputs[scalar["name"]] = scalar["start"] + + self._quantities.append(scalar) + + def getQuantities(self, names: Optional[Union[str, list[str]]] = None) -> list[dict]: + """ + This method returns list of dictionaries. It displays details of + quantities such as name, value, changeable, and description. + + Examples: + >>> mod.getQuantities() + [ + { + 'alias': 'noAlias', + 'aliasvariable': None, + 'causality': 'local', + 'changeable': 'true', + 'description': None, + 'max': None, + 'min': None, + 'name': 'x', + 'start': '1.0', + 'unit': None, + 'variability': 'continuous', + }, + { + 'name': 'der(x)', + # ... + }, + # ... + ] + + >>> getQuantities("y") + [{ + 'name': 'y', # ... + }] + + >>> getQuantities(["y","x"]) + [ + { + 'name': 'y', # ... + }, + { + 'name': 'x', # ... + } + ] + """ + if names is None: + return self._quantities + + if isinstance(names, str): + r = [x for x in self._quantities if x["name"] == names] + if r == []: + raise KeyError(names) + return r + + if isinstance(names, list): + return [x for y in names for x in self._quantities if x["name"] == y] + + raise ModelicaSystemError("Unhandled input for getQuantities()") + + def getParameters( + self, + names: Optional[Union[str, list[str]]] = None, + ) -> Union[dict[str, str], list[str]]: + """Get parameter values. + + Args: + names: Either None (default), a string with the parameter name, + or a list of parameter name strings. + Returns: + If `names` is None, a dict in the format + {parameter_name: parameter_value} is returned. + If `names` is a string, a single element list is returned. + If `names` is a list, a list with one value for each parameter name + in names is returned. + In all cases, parameter values are returned as strings. + + Examples: + >>> mod.getParameters() + {'Name1': '1.23', 'Name2': '4.56'} + >>> mod.getParameters("Name1") + ['1.23'] + >>> mod.getParameters(["Name1","Name2"]) + ['1.23', '4.56'] + """ + if names is None: + return self._params + if isinstance(names, str): + return [self._params[names]] + if isinstance(names, list): + return [self._params[x] for x in names] + + raise ModelicaSystemError("Unhandled input for getParameters()") + + def getInputs( + self, + names: Optional[Union[str, list[str]]] = None, + ) -> Union[dict[str, list[tuple[float, float]]], list[list[tuple[float, float]]]]: + """Get values of input signals. + + Args: + names: Either None (default), a string with the input name, + or a list of input name strings. + Returns: + If `names` is None, a dict in the format + {input_name: input_value} is returned. + If `names` is a string, a single element list [input_value] is + returned. + If `names` is a list, a list with one value for each input name + in names is returned: [input1_values, input2_values, ...]. + In all cases, input values are returned as a list of tuples, + where the first element in the tuple is the time and the second + element is the input value. + + Examples: + >>> mod.getInputs() + {'Name1': [(0.0, 0.0), (1.0, 1.0)], 'Name2': None} + >>> mod.getInputs("Name1") + [[(0.0, 0.0), (1.0, 1.0)]] + >>> mod.getInputs(["Name1","Name2"]) + [[(0.0, 0.0), (1.0, 1.0)], None] + """ + if names is None: + return self._inputs + if isinstance(names, str): + return [self._inputs[names]] + if isinstance(names, list): + return [self._inputs[x] for x in names] + + raise ModelicaSystemError("Unhandled input for getInputs()") + + def getOutputs( + self, + names: Optional[Union[str, list[str]]] = None, + ) -> Union[dict[str, Union[str, numbers.Real]], list[Union[str, numbers.Real]]]: + """Get values of output signals. + + Returns the initial values as strings only. + + Args: + names: Either None (default), a string with the output name, + or a list of output name strings. + Returns: + If `names` is None, a dict in the format + {output_name: output_value} is returned. + If `names` is a string, a single element list [output_value] is + returned. + If `names` is a list, a list with one value for each output name + in names is returned: [output1_value, output2_value, ...]. + + Examples: + Before simulate(): + >>> mod.getOutputs() + {'out1': '-0.4', 'out2': '1.2'} + >>> mod.getOutputs("out1") + ['-0.4'] + >>> mod.getOutputs(["out1","out2"]) + ['-0.4', '1.2'] + """ + if names is None: + return self._outputs + if isinstance(names, str): + return [self._outputs[names]] + return [self._outputs[x] for x in names] + + def getContinuous( + self, + names: Optional[Union[str, list[str]]] = None, + ) -> Union[dict[str, Union[str, numbers.Real]], list[Union[str, numbers.Real]]]: + """Get values of continuous signals.""" + if names is None: + return self._continuous + if isinstance(names, str): + return [self._continuous[names]] + if isinstance(names, list): + return [self._continuous[x] for x in names] + + raise ModelicaSystemError("Unhandled input for getContinuous()") + + def getSimulationOptions( + self, + names: Optional[Union[str, list[str]]] = None, + ) -> Union[dict[str, str], list[str]]: + """Get simulation options such as stopTime and tolerance. + + Args: + names: Either None (default), a string with the simulation option + name, or a list of option name strings. + + Returns: + If `names` is None, a dict in the format + {option_name: option_value} is returned. + If `names` is a string, a single element list [option_value] is + returned. + If `names` is a list, a list with one value for each option name + in names is returned: [option1_value, option2_value, ...]. + Option values are always returned as strings. + + Examples: + >>> mod.getSimulationOptions() + {'startTime': '0', 'stopTime': '1.234', + 'stepSize': '0.002', 'tolerance': '1.1e-08', 'solver': 'dassl', 'outputFormat': 'mat'} + >>> mod.getSimulationOptions("stopTime") + ['1.234'] + >>> mod.getSimulationOptions(["tolerance", "stopTime"]) + ['1.1e-08', '1.234'] + """ + if names is None: + return self._simulate_options + if isinstance(names, str): + return [self._simulate_options[names]] + if isinstance(names, list): + return [self._simulate_options[x] for x in names] + + raise ModelicaSystemError("Unhandled input for getSimulationOptions()") + + def getOptimizationOptions( + self, + names: Optional[Union[str, list[str]]] = None, + ) -> Union[dict[str, Union[str, float]], list[Union[str, float]]]: + """Get simulation options used for optimization. + + Args: + names: Either None (default), a string with the optimization option + name, or a list of option name strings. + + Returns: + If `names` is None, a dict in the format + {option_name: option_value} is returned. + If `names` is a string, a single element list [option_value] is + returned. + If `names` is a list, a list with one value for each option name + in names is returned: [option1_value, option2_value, ...]. + Some option values are returned as float when first initialized, + but always as strings after setOptimizationOptions is used to + change them. + + Examples: + >>> mod.getOptimizationOptions() + {'startTime': 0.0, 'stopTime': 1.0, 'numberOfIntervals': 500, 'stepSize': 0.002, 'tolerance': 1e-08} + >>> mod.getOptimizationOptions("stopTime") + [1.0] + >>> mod.getOptimizationOptions(["tolerance", "stopTime"]) + [1e-08, 1.0] + """ + if names is None: + return self._optimization_options + if isinstance(names, str): + return [self._optimization_options[names]] + if isinstance(names, list): + return [self._optimization_options[x] for x in names] + + raise ModelicaSystemError("Unhandled input for getOptimizationOptions()") + + @staticmethod + def _prepare_input_data( + input_args: Any, + input_kwargs: dict[str, Any], + ) -> dict[str, str]: + """ + Convert raw input to a structured dictionary {'key1': 'value1', 'key2': 'value2'}. + """ + + def prepare_str(str_in: str) -> dict[str, str]: + str_in = str_in.replace(" ", "") + key_val_list: list[str] = str_in.split("=") + if len(key_val_list) != 2: + raise ModelicaSystemError(f"Invalid 'key=value' pair: {str_in}") + + input_data_from_str: dict[str, str] = {key_val_list[0]: key_val_list[1]} + + return input_data_from_str + + input_data: dict[str, str] = {} + + for input_arg in input_args: + if isinstance(input_arg, str): + warnings.warn(message="The definition of values to set should use a dictionary, " + "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " + "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", + category=DeprecationWarning, + stacklevel=3) + input_data = input_data | prepare_str(input_arg) + elif isinstance(input_arg, list): + warnings.warn(message="The definition of values to set should use a dictionary, " + "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " + "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", + category=DeprecationWarning, + stacklevel=3) + + for item in input_arg: + if not isinstance(item, str): + raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(item)}!") + input_data = input_data | prepare_str(item) + elif isinstance(input_arg, dict): + input_data = input_data | input_arg + else: + raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(input_arg)}!") + + if len(input_kwargs): + for key, val in input_kwargs.items(): + # ensure all values are strings to align it on one type: dict[str, str] + if not isinstance(val, str): + # spaces have to be removed as setInput() could take list of tuples as input and spaces would + # result in an error on recreating the input data + str_val = str(val).replace(' ', '') + else: + str_val = val + if ' ' in key or ' ' in str_val: + raise ModelicaSystemError(f"Spaces not allowed in key/value pairs: {repr(key)} = {repr(val)}!") + input_data[key] = str_val + + return input_data + + def _set_method_helper( + self, + inputdata: dict[str, str], + classdata: dict[str, Any], + datatype: str, + overridedata: Optional[dict[str, str]] = None, + ) -> bool: + """ + Helper function for set*() methods. + + Parameters + ---------- + inputdata + string or list of string given by user + classdata + dict() containing the values of different variables (eg: parameter, continuous, simulation parameters) + datatype + type identifier (eg; continuous, parameter, simulation, linearization, optimization) + overridedata + dict() which stores the new override variables list, + """ + + for key, val in inputdata.items(): + if key not in classdata: + raise ModelicaSystemError(f"Invalid variable for type {repr(datatype)}: {repr(key)}") + + if datatype == "parameter" and not self.isParameterChangeable(key): + raise ModelicaSystemError(f"It is not possible to set the parameter {repr(key)}. It seems to be " + "structural, final, protected, evaluated or has a non-constant binding. " + "Use sendExpression(...) and rebuild the model using buildModel() API; " + "command to set the parameter before rebuilding the model: " + "sendExpression(\"setParameterValue(" + f"{self._model_name}, {key}, {val if val is not None else ''}" + ")\").") + + classdata[key] = val + if overridedata is not None: + overridedata[key] = val + + return True + + def isParameterChangeable( + self, + name: str, + ) -> bool: + """ + Return if the parameter defined by name is changeable (= non-structural; can be modified without the need to + recompile the model). + """ + q = self.getQuantities(name) + if q[0]["changeable"] == "false": + return False + return True + + def setContinuous( + self, + *args: Any, + **kwargs: dict[str, Any], + ) -> bool: + """ + This method is used to set continuous values. It can be called: + with a sequence of continuous name and assigning corresponding values as arguments as show in the example below: + usage + >>> setContinuous("Name=value") # depreciated + >>> setContinuous(["Name1=value1","Name2=value2"]) # depreciated + + >>> setContinuous(Name1="value1", Name2="value2") + >>> param = {"Name1": "value1", "Name2": "value2"} + >>> setContinuous(**param) + """ + inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + + return self._set_method_helper( + inputdata=inputdata, + classdata=self._continuous, + datatype="continuous", + overridedata=self._override_variables) + + def setParameters( + self, + *args: Any, + **kwargs: dict[str, Any], + ) -> bool: + """ + This method is used to set parameter values. It can be called: + with a sequence of parameter name and assigning corresponding value as arguments as show in the example below: + usage + >>> setParameters("Name=value") # depreciated + >>> setParameters(["Name1=value1","Name2=value2"]) # depreciated + + >>> setParameters(Name1="value1", Name2="value2") + >>> param = {"Name1": "value1", "Name2": "value2"} + >>> setParameters(**param) + """ + inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + + return self._set_method_helper( + inputdata=inputdata, + classdata=self._params, + datatype="parameter", + overridedata=self._override_variables) + + def setSimulationOptions( + self, + *args: Any, + **kwargs: dict[str, Any], + ) -> bool: + """ + This method is used to set simulation options. It can be called: + with a sequence of simulation options name and assigning corresponding values as arguments as show in the + example below: + usage + >>> setSimulationOptions("Name=value") # depreciated + >>> setSimulationOptions(["Name1=value1","Name2=value2"]) # depreciated + + >>> setSimulationOptions(Name1="value1", Name2="value2") + >>> param = {"Name1": "value1", "Name2": "value2"} + >>> setSimulationOptions(**param) + """ + inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + + return self._set_method_helper( + inputdata=inputdata, + classdata=self._simulate_options, + datatype="simulation-option", + overridedata=self._simulate_options_override) + + def setOptimizationOptions( + self, + *args: Any, + **kwargs: dict[str, Any], + ) -> bool: + """ + This method is used to set optimization options. It can be called: + with a sequence of optimization options name and assigning corresponding values as arguments as show in the + example below: + usage + >>> setOptimizationOptions("Name=value") # depreciated + >>> setOptimizationOptions(["Name1=value1","Name2=value2"]) # depreciated + + >>> setOptimizationOptions(Name1="value1", Name2="value2") + >>> param = {"Name1": "value1", "Name2": "value2"} + >>> setOptimizationOptions(**param) + """ + inputdata = self._prepare_input_data(input_args=args, input_kwargs=kwargs) + + return self._set_method_helper( + inputdata=inputdata, + classdata=self._optimization_options, + datatype="optimization-option", + overridedata=None) diff --git a/OMPython/OMRunner.py b/OMPython/OMRunner.py new file mode 100644 index 00000000..93f086e1 --- /dev/null +++ b/OMPython/OMRunner.py @@ -0,0 +1,150 @@ +import os +import stat +import platform +import subprocess +import shlex +from typing import Optional +import xml.etree.ElementTree as ET +import logging + +# Import base class and exception +from OMPython.OMBase import ModelicaSystemBase, ModelicaSystemError + +# define logger using the current module name as ID +logger = logging.getLogger(__name__) + + +class ModelicaSystemRunner(ModelicaSystemBase): + def __init__( + self, + runpath: str, + modelname: Optional[str] = None, + variableFilter: Optional[list] = None, + ) -> None: + if modelname is None: + raise ModelicaSystemError("Missing model name!") + + # Initialize base class + super().__init__() + + self.runpath = os.path.abspath(runpath) + + print(self.runpath) + exefile = os.path.join(self.runpath, modelname) + if not os.path.isfile(exefile): + raise ModelicaSystemError("Executable model file not found in {0}".format(exefile)) + + self.xmlFileName = modelname + "_init.xml" + self.xmlFile = os.path.join(self.runpath, self.xmlFileName) + self._model_name = modelname # Model class name + self.inputFlag = False # for model with input quantity + self.simulationFlag = False # if the model is simulated? + self.outputFlag = False + self.csvFile = '' # for storing inputs condition + self.resultfile = "" # for storing result file + self.variableFilter = variableFilter + self._xmlparse(xml_file=self.xmlFile) + self._variable_filter: Optional[str] = None + print("Init done") + + def _xmlparse(self, xml_file: str): + if not os.path.isfile(xml_file): + raise ModelicaSystemError(f"XML file not generated: {xml_file}") + + # OMRunner uses ET.parse directly on the file path + tree = ET.parse(xml_file) + root = tree.getroot() + + # Delegate population to base class + self._populate_from_xml_root(root) + + def simulate(self, resultfile=None, simflags=None, overrideaux=None): # 11 + """This method simulates model according to the simulation options.""" + if resultfile is None: + r = "" + self.resultfile = "".join([self._model_name, "_res.mat"]) + else: + r = " -r=" + resultfile + self.resultfile = resultfile + + # allow runtime simulation flags from user input + if simflags is None: + simflags = "" + else: + simflags = " " + simflags + + if (self._override_variables or self._simulate_options_override): + tmpdict = self._override_variables.copy() + tmpdict.update(self._simulate_options_override) + values1 = ','.join("%s=%s" % (key, val) for (key, val) in list(tmpdict.items())) + override = " -override=" + values1 + else: + override = "" + # add override flags not parameters or simulation options + if overrideaux: + if override: + override = override + "," + overrideaux + else: + override = " -override=" + overrideaux + + if (self.inputFlag): # if model has input quantities + for i in self._inputs: + val = self._inputs[i] + if val is None: + val = [ + (float(self._simulate_options["startTime"]), 0.0), + (float(self._simulate_options["stopTime"]), 0.0)] + self._inputs[i] = [ + (float(self._simulate_options["startTime"]), 0.0), + (float(self._simulate_options["stopTime"]), 0.0)] + if float(self._simulate_options["startTime"]) != val[0][0]: + print("!!! startTime not matched for Input ", i) + return + if float(self._simulate_options["stopTime"]) != val[-1][0]: + print("!!! stopTime not matched for Input ", i) + return + if val[0][0] < float(self._simulate_options["startTime"]): + print('Input time value is less than simulation startTime for inputs', i) + return + # self.__simInput() # create csv file # commented by Joerg + csvinput = " -csvInput=" + self.csvFile + else: + csvinput = "" + + if self.xmlFile is not None: + cwd_current = os.getcwd() + os.chdir(os.path.join(os.path.dirname(self.xmlFile))) + + if (platform.system() == "Windows"): + getExeFile = os.path.join(os.getcwd(), '{}.{}'.format(self._model_name, "exe")).replace("\\", "/") + else: + getExeFile = os.path.join(os.getcwd(), self._model_name).replace("\\", "/") + + out = None + if (os.path.exists(getExeFile)): + if not os.access(getExeFile, os.X_OK): # ensure that executable permission is set + st = os.stat(getExeFile) + os.chmod(getExeFile, st.st_mode | stat.S_IEXEC) + cmd = getExeFile + override + csvinput + r + simflags + if (platform.system() == "Windows"): + omhome = os.path.join(os.environ.get("OPENMODELICAHOME")) + dllPath = os.path.join(omhome, "bin").replace("\\", "/") + os.pathsep + \ + os.path.join(omhome, "lib/omc").replace("\\", "/") + os.pathsep + \ + os.path.join(omhome, "lib/omc/cpp").replace("\\", "/") + os.pathsep + \ + os.path.join(omhome, "lib/omc/omsicpp").replace("\\", "/") + my_env = os.environ.copy() + my_env["PATH"] = dllPath + os.pathsep + my_env["PATH"] + p = subprocess.Popen(cmd, env=my_env) + p.wait() + p.terminate() + else: + print(str(cmd)) + p = subprocess.run(shlex.split(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + out = p.stdout # .read() + self.simulationFlag = True + if self.xmlFile is not None: + os.chdir(cwd_current) + + else: + raise ModelicaSystemError("Modelica application file not generated yet") + return out diff --git a/OMPython/__init__.py b/OMPython/__init__.py index de861736..8b65edc7 100644 --- a/OMPython/__init__.py +++ b/OMPython/__init__.py @@ -7,12 +7,18 @@ omc.sendExpression("command") """ +from OMPython.OMBase import ( + ModelicaSystemBase, + ModelicaSystemError, +) from OMPython.ModelicaSystem import ( LinearizationResult, ModelicaSystem, ModelicaSystemCmd, ModelicaSystemDoE, - ModelicaSystemError, +) +from OMPython.OMRunner import ( + ModelicaSystemRunner, ) from OMPython.OMCSession import ( OMCSessionCmd, @@ -30,9 +36,11 @@ __all__ = [ 'LinearizationResult', 'ModelicaSystem', + 'ModelicaSystemBase', 'ModelicaSystemCmd', 'ModelicaSystemDoE', 'ModelicaSystemError', + 'ModelicaSystemRunner', 'OMCSessionCmd', 'OMCSessionException',