Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 115 additions & 60 deletions OMPython/ModelicaSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import numpy as np

from OMPython.OMCSession import (
ModelExecutionData,
ModelExecutionException,

OMCSessionException,
OMCSessionRunData,
OMCSession,
OMCSessionLocal,
OMCPath,
Expand All @@ -34,7 +36,7 @@

class ModelicaSystemError(Exception):
"""
Exception used in ModelicaSystem and ModelicaSystemCmd classes.
Exception used in ModelicaSystem classes.
"""


Expand Down Expand Up @@ -89,7 +91,7 @@ def __getitem__(self, index: int):
return {0: self.A, 1: self.B, 2: self.C, 3: self.D}[index]


class ModelicaSystemCmd:
class ModelExecutionCmd:
"""
All information about a compiled model executable. This should include data about all structured parameters, i.e.
parameters which need a recompilation of the model. All non-structured parameters can be easily changed without
Expand All @@ -98,16 +100,22 @@ class ModelicaSystemCmd:

def __init__(
self,
session: OMCSession,
runpath: OMCPath,
modelname: Optional[str] = None,
runpath: os.PathLike,
cmd_prefix: list[str],
cmd_local: bool = False,
cmd_windows: bool = False,
timeout: float = 10.0,
model_name: Optional[str] = None,
) -> None:
if modelname is None:
raise ModelicaSystemError("Missing model name!")
if model_name is None:
raise ModelExecutionException("Missing model name!")

self._session = session
self._runpath = runpath
self._model_name = modelname
self._cmd_local = cmd_local
self._cmd_windows = cmd_windows
self._cmd_prefix = cmd_prefix
self._runpath = pathlib.PurePosixPath(runpath)
self._model_name = model_name
self._timeout = timeout

# dictionaries of command line arguments for the model executable
self._args: dict[str, str | None] = {}
Expand Down Expand Up @@ -152,26 +160,26 @@ def override2str(
elif isinstance(oval, numbers.Number):
oval_str = str(oval)
else:
raise ModelicaSystemError(f"Invalid value for override key {okey}: {type(oval)}")
raise ModelExecutionException(f"Invalid value for override key {okey}: {type(oval)}")

return f"{okey}={oval_str}"

if not isinstance(key, str):
raise ModelicaSystemError(f"Invalid argument key: {repr(key)} (type: {type(key)})")
raise ModelExecutionException(f"Invalid argument key: {repr(key)} (type: {type(key)})")
key = key.strip()

if isinstance(val, dict):
if key != 'override':
raise ModelicaSystemError("Dictionary input only possible for key 'override'!")
raise ModelExecutionException("Dictionary input only possible for key 'override'!")

for okey, oval in val.items():
if not isinstance(okey, str):
raise ModelicaSystemError("Invalid key for argument 'override': "
f"{repr(okey)} (type: {type(okey)})")
raise ModelExecutionException("Invalid key for argument 'override': "
f"{repr(okey)} (type: {type(okey)})")

if not isinstance(oval, (str, bool, numbers.Number, type(None))):
raise ModelicaSystemError(f"Invalid input for 'override'.{repr(okey)}: "
f"{repr(oval)} (type: {type(oval)})")
raise ModelExecutionException(f"Invalid input for 'override'.{repr(okey)}: "
f"{repr(oval)} (type: {type(oval)})")

if okey in self._arg_override:
if oval is None:
Expand All @@ -193,7 +201,7 @@ def override2str(
elif isinstance(val, numbers.Number):
argval = str(val)
else:
raise ModelicaSystemError(f"Invalid argument value for {repr(key)}: {repr(val)} (type: {type(val)})")
raise ModelExecutionException(f"Invalid argument value for {repr(key)}: {repr(val)} (type: {type(val)})")

if key in self._args:
logger.warning(f"Override model executable argument: {repr(key)} = {repr(argval)} "
Expand Down Expand Up @@ -233,7 +241,7 @@ def get_cmd_args(self) -> list[str]:

return cmdl

def definition(self) -> OMCSessionRunData:
def definition(self) -> ModelExecutionData:
"""
Define all needed data to run the model executable. The data is stored in an OMCSessionRunData object.
"""
Expand All @@ -242,18 +250,50 @@ def definition(self) -> OMCSessionRunData:
if not isinstance(result_file, str):
result_file = (self._runpath / f"{self._model_name}.mat").as_posix()

omc_run_data = OMCSessionRunData(
cmd_path=self._runpath.as_posix(),
# as this is the local implementation, pathlib.Path can be used
cmd_path = self._runpath

cmd_library_path = None
if self._cmd_local and self._cmd_windows:
cmd_library_path = ""

# set the process environment from the generated .bat file in windows which should have all the dependencies
# for this pathlib.PurePosixPath() must be converted to a pathlib.Path() object, i.e. WindowsPath
path_bat = pathlib.Path(cmd_path) / f"{self._model_name}.bat"
if not path_bat.is_file():
raise ModelExecutionException("Batch file (*.bat) does not exist " + str(path_bat))

content = path_bat.read_text(encoding='utf-8')
for line in content.splitlines():
match = re.match(pattern=r"^SET PATH=([^%]*)", string=line, flags=re.IGNORECASE)
if match:
cmd_library_path = match.group(1).strip(';') # Remove any trailing semicolons
my_env = os.environ.copy()
my_env["PATH"] = cmd_library_path + os.pathsep + my_env["PATH"]

cmd_model_executable = cmd_path / f"{self._model_name}.exe"
else:
# for Linux the paths to the needed libraries should be included in the executable (using rpath)
cmd_model_executable = cmd_path / self._model_name

# define local(!) working directory
cmd_cwd_local = None
if self._cmd_local:
cmd_cwd_local = cmd_path.as_posix()

omc_run_data = ModelExecutionData(
cmd_path=cmd_path.as_posix(),
cmd_model_name=self._model_name,
cmd_args=self.get_cmd_args(),
cmd_result_path=result_file,
)

omc_run_data_updated = self._session.omc_run_data_update(
omc_run_data=omc_run_data,
cmd_result_file=result_file,
cmd_prefix=self._cmd_prefix,
cmd_library_path=cmd_library_path,
cmd_model_executable=cmd_model_executable.as_posix(),
cmd_cwd_local=cmd_cwd_local,
cmd_timeout=self._timeout,
)

return omc_run_data_updated
return omc_run_data

@staticmethod
def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, Any] | numbers.Number]]:
Expand All @@ -262,17 +302,19 @@ def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, Any] | n

The return data can be used as input for self.args_set().
"""
warnings.warn(message="The argument 'simflags' is depreciated and will be removed in future versions; "
"please use 'simargs' instead",
category=DeprecationWarning,
stacklevel=2)
warnings.warn(
message="The argument 'simflags' is depreciated and will be removed in future versions; "
"please use 'simargs' instead",
category=DeprecationWarning,
stacklevel=2,
)

simargs: dict[str, Optional[str | dict[str, Any] | numbers.Number]] = {}

args = [s for s in simflags.split(' ') if s]
for arg in args:
if arg[0] != '-':
raise ModelicaSystemError(f"Invalid simulation flag: {arg}")
raise ModelExecutionException(f"Invalid simulation flag: {arg}")
arg = arg[1:]
parts = arg.split('=')
if len(parts) == 1:
Expand All @@ -284,12 +326,12 @@ def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, Any] | n
for item in override.split(','):
kv = item.split('=')
if not 0 < len(kv) < 3:
raise ModelicaSystemError(f"Invalid value for '-override': {override}")
raise ModelExecutionException(f"Invalid value for '-override': {override}")
if kv[0]:
try:
override_dict[kv[0]] = kv[1]
except (KeyError, IndexError) as ex:
raise ModelicaSystemError(f"Invalid value for '-override': {override}") from ex
raise ModelExecutionException(f"Invalid value for '-override': {override}") from ex

simargs[parts[0]] = override_dict

Expand Down Expand Up @@ -528,6 +570,25 @@ def getWorkDirectory(self) -> OMCPath:
"""
return self._work_dir

def check_model_executable(self):
"""
Check if the model executable is working
"""
# check if the executable exists ...
om_cmd = ModelExecutionCmd(
runpath=self.getWorkDirectory(),
cmd_local=self._session.model_execution_local,
cmd_windows=self._session.model_execution_windows,
cmd_prefix=self._session.model_execution_prefix(cwd=self.getWorkDirectory()),
model_name=self._model_name,
)
# ... by running it - output help for command help
om_cmd.arg_set(key="help", val="help")
cmd_definition = om_cmd.definition()
returncode = cmd_definition.run()
if returncode != 0:
raise ModelicaSystemError("Model executable not working!")

def buildModel(self, variableFilter: Optional[str] = None):
filter_def: Optional[str] = None
if variableFilter is not None:
Expand All @@ -544,17 +605,7 @@ def buildModel(self, variableFilter: Optional[str] = None):
logger.debug("OM model build result: %s", build_model_result)

# check if the executable exists ...
om_cmd = ModelicaSystemCmd(
session=self._session,
runpath=self.getWorkDirectory(),
modelname=self._model_name,
)
# ... by running it - output help for command help
om_cmd.arg_set(key="help", val="help")
cmd_definition = om_cmd.definition()
returncode = self._session.run_model_executable(cmd_run_data=cmd_definition)
if returncode != 0:
raise ModelicaSystemError("Model executable not working!")
self.check_model_executable()

xml_file = self._session.omcpath(build_model_result[0]).parent / build_model_result[1]
self._xmlparse(xml_file=xml_file)
Expand Down Expand Up @@ -1033,7 +1084,7 @@ def _parse_om_version(self, version: str) -> tuple[int, int, int]:

def _process_override_data(
self,
om_cmd: ModelicaSystemCmd,
om_cmd: ModelExecutionCmd,
override_file: OMCPath,
override_var: dict[str, str],
override_sim: dict[str, str],
Expand Down Expand Up @@ -1069,7 +1120,7 @@ def simulate_cmd(
result_file: OMCPath,
simflags: Optional[str] = None,
simargs: Optional[dict[str, Optional[str | dict[str, Any] | numbers.Number]]] = None,
) -> ModelicaSystemCmd:
) -> ModelExecutionCmd:
"""
This method prepares the simulates model according to the simulation options. It returns an instance of
ModelicaSystemCmd which can be used to run the simulation.
Expand All @@ -1091,10 +1142,12 @@ def simulate_cmd(
An instance if ModelicaSystemCmd to run the requested simulation.
"""

om_cmd = ModelicaSystemCmd(
session=self._session,
om_cmd = ModelExecutionCmd(
runpath=self.getWorkDirectory(),
modelname=self._model_name,
cmd_local=self._session.model_execution_local,
cmd_windows=self._session.model_execution_windows,
cmd_prefix=self._session.model_execution_prefix(cwd=self.getWorkDirectory()),
model_name=self._model_name,
)

# always define the result file to use
Expand Down Expand Up @@ -1183,7 +1236,7 @@ def simulate(
self._result_file.unlink()
# ... run simulation ...
cmd_definition = om_cmd.definition()
returncode = self._session.run_model_executable(cmd_run_data=cmd_definition)
returncode = cmd_definition.run()
# and check returncode *AND* resultfile
if returncode != 0 and self._result_file.is_file():
# check for an empty (=> 0B) result file which indicates a crash of the model executable
Expand Down Expand Up @@ -1786,10 +1839,12 @@ def linearize(
"use ModelicaSystem() to build the model first"
)

om_cmd = ModelicaSystemCmd(
session=self._session,
om_cmd = ModelExecutionCmd(
runpath=self.getWorkDirectory(),
modelname=self._model_name,
cmd_local=self._session.model_execution_local,
cmd_windows=self._session.model_execution_windows,
cmd_prefix=self._session.model_execution_prefix(cwd=self.getWorkDirectory()),
model_name=self._model_name,
)

self._process_override_data(
Expand Down Expand Up @@ -1829,7 +1884,7 @@ def linearize(
linear_file.unlink(missing_ok=True)

cmd_definition = om_cmd.definition()
returncode = self._session.run_model_executable(cmd_run_data=cmd_definition)
returncode = cmd_definition.run()
if returncode != 0:
raise ModelicaSystemError(f"Linearize failed with return code: {returncode}")
if not linear_file.is_file():
Expand Down Expand Up @@ -2015,7 +2070,7 @@ def __init__(
self._parameters = {}

self._doe_def: Optional[dict[str, dict[str, Any]]] = None
self._doe_cmd: Optional[dict[str, OMCSessionRunData]] = None
self._doe_cmd: Optional[dict[str, ModelExecutionData]] = None

def get_session(self) -> OMCSession:
"""
Expand Down Expand Up @@ -2134,7 +2189,7 @@ def get_doe_definition(self) -> Optional[dict[str, dict[str, Any]]]:
"""
return self._doe_def

def get_doe_command(self) -> Optional[dict[str, OMCSessionRunData]]:
def get_doe_command(self) -> Optional[dict[str, ModelExecutionData]]:
"""
Get the definitions of simulations commands to run for this DoE.
"""
Expand Down Expand Up @@ -2180,13 +2235,13 @@ def worker(worker_id, task_queue):
if cmd_definition is None:
raise ModelicaSystemError("Missing simulation definition!")

resultfile = cmd_definition.cmd_result_path
resultfile = cmd_definition.cmd_result_file
resultpath = self.get_session().omcpath(resultfile)

logger.info(f"[Worker {worker_id}] Performing task: {resultpath.name}")

try:
returncode = self.get_session().run_model_executable(cmd_run_data=cmd_definition)
returncode = cmd_definition.run()
logger.info(f"[Worker {worker_id}] Simulation {resultpath.name} "
f"finished with return code: {returncode}")
except ModelicaSystemError as ex:
Expand Down
Loading