Skip to content

Commit

Permalink
Merge pull request #280 from nanglo123/preprocess_stella_rate_laws
Browse files Browse the repository at this point in the history
Preprocess stella expressions for PySD parsing and add error handling for currently unparseable expressions
  • Loading branch information
bgyori authored Feb 15, 2024
2 parents a1dcd25 + 09a1840 commit e133987
Show file tree
Hide file tree
Showing 3 changed files with 412 additions and 67 deletions.
144 changes: 106 additions & 38 deletions mira/sources/system_dynamics/pysd.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""This module implements parsing of a generic pysd model irrespective of source and source type
and extracting its contents to create an equivalent MIRA template model.
"""
__all__ = ["template_model_from_pysd_model"]

import pandas as pd
import sympy
Expand All @@ -14,15 +15,22 @@
get_sympy,
)

CONTROL_VARIABLE_NAMES = {"FINALTIME", "INITIALTIME", "SAVEPER", "TIMESTEP"}
CONTROL_VARIABLE_NAMES = {
"FINALTIME",
"INITIALTIME",
"SAVEPER",
"TIMESTEP",
"FINAL TIME",
"INITIAL TIME",
"TIME STEP",
}
UNITS_MAPPING = {
sympy.Symbol("Person"): sympy.Symbol("person"),
sympy.Symbol("Persons"): sympy.Symbol("person"),
sympy.Symbol("Day"): sympy.Symbol("day"),
sympy.Symbol("Days"): sympy.Symbol("day"),
}

__all__ = ["template_model_from_pysd_model"]
SYMPY_FLOW_RATE_PLACEHOLDER = safe_parse_expr("xxplaceholderxx")


def template_model_from_pysd_model(pysd_model, expression_map) -> TemplateModel:
Expand Down Expand Up @@ -130,32 +138,70 @@ def template_model_from_pysd_model(pysd_model, expression_map) -> TemplateModel:
for state_initial_value, (state_name, state_concept) in zip(
state_initial_values, concepts.items()
):
initial = Initial(
concept=concepts[state_name].copy(deep=True),
expression=SympyExprStr(sympy.Float(state_initial_value)),
)
# for the case when a state's initial value is a numpy array
try:
initial = Initial(
concept=concepts[state_name].copy(deep=True),
expression=SympyExprStr(sympy.Float(state_initial_value)),
)
except TypeError:
initial = Initial(
concept=concepts[state_name].copy(deep=True),
expression=SympyExprStr("0")
)
mira_initials[initial.concept.name] = initial

# process parameters
# Currently cannot capture all parameters as some of them cannot have their expressions parsed
# Additionally, some auxiliary variables are added as parameters due to inability to parse
# the auxiliary expression and differentiate it from a parameter
mira_parameters = {}
for name, expression in processed_expression_map.items():
if expression.replace(".", "").replace(" ", "").isdecimal():
# Sometimes parameter values reference a stock rather than being a number
# Current placeholder for incorrectly constructed parameter expressions
try:
eval_expression = safe_parse_expr(expression).evalf()
except TypeError:
eval_expression = safe_parse_expr("0")

str_eval_expression = str(eval_expression)
value = None
is_initial = False
if str_eval_expression in mira_initials:
value = float(str(mira_initials[str_eval_expression].expression))
is_initial = True

# Replace negative signs for placeholder parameter value for Aux structures
# that cannot be parsed
if str_eval_expression in mira_initials or (
eval_expression != SYMPY_FLOW_RATE_PLACEHOLDER
and str_eval_expression.replace(".", "")
.replace(" ", "")
.replace("-", "")
.isdecimal()
):
if not is_initial:
value = float(str_eval_expression)
model_parameter_info = model_doc_df[model_doc_df["Py Name"] == name]
if model_parameter_info["Units"].values[0]:
if (
model_parameter_info["Units"].values[0]
and model_parameter_info["Units"].values[0] != "dimensionless"
):
unit_text = (
model_parameter_info["Units"].values[0].replace(" ", "")
)

parameter = {
"id": name,
"value": float(expression),
"value": value,
"description": model_parameter_info["Comment"].values[0],
"units": {"expression": unit_text},
}
else:
# if units don't exist
parameter = {
"id": name,
"value": float(expression),
"value": value,
"description": model_parameter_info["Comment"].values[0],
}

Expand All @@ -168,44 +214,61 @@ def template_model_from_pysd_model(pysd_model, expression_map) -> TemplateModel:
param_unit.expression = param_unit.expression.subs(
old_unit_symbol, new_unit_symbol
)

# construct transitions mapping that determine inputs and outputs states to a rate-law
transition_map = {}
auxiliaries = model_doc_df[model_doc_df["Type"] == "Auxiliary"]
auxiliaries = model_doc_df[
(model_doc_df["Type"] == "Auxiliary")
| (model_doc_df["Type"] == "Constant")
]

# currently, we add every auxiliary to the map of transitions even if it is not a transition
# no set way to differentiate between auxiliaries of transitions
for index, aux_tuple in auxiliaries.iterrows():
if (
aux_tuple["Subtype"] == "Normal"
and aux_tuple["Real Name"] not in CONTROL_VARIABLE_NAMES
):
rate_name = aux_tuple["Py Name"]
rate_expr = safe_parse_expr(
preprocess_text(processed_expression_map[rate_name]),
symbols,
)
input_state, output_state, controller = None, None, None
# Current placeholder for incorrectly constructed rate/parameter expressions
try:
rate_expr = safe_parse_expr(
processed_expression_map[rate_name],
symbols,
)
except TypeError:
rate_expr = SYMPY_FLOW_RATE_PLACEHOLDER

inputs, outputs, controllers = [], [], []

# If we come across a rate-law that is leaving a state, we add the state as an input
# to the rate-law, vice-versa if a rate-law is going into a state.
for state_name, in_out in state_rate_map.items():
if rate_name in in_out["output_rates"]:
input_state = state_name
if rate_name in in_out["input_rates"]:
output_state = state_name
for state_name, in_out_rate_map in state_rate_map.items():
if rate_name in in_out_rate_map["output_rates"]:
inputs.append(state_name)
if rate_name in in_out_rate_map["input_rates"]:
outputs.append(state_name)
# if a state isn't consumed by a flow (the flow isn't listed as an output of
# the state) but affects the rate of a flow, then that state is a controller
if (
sympy.Symbol(state_name) in rate_expr.free_symbols
and rate_name
not in state_rate_map[state_name]["output_rates"]
):
controller = output_state
controllers.append(state_name)

# if the auxiliary does not have inputs, outputs, or controllers, we know it is not
# a transition
# some flows also used as auxiliaries for other flows' expression rates and thus are
# not converted to templates
if not inputs and not outputs and not controllers:
continue

transition_map[rate_name] = {
"name": rate_name,
"expression": rate_expr,
"input": input_state,
"output": output_state,
"controller": controller,
"inputs": inputs,
"outputs": outputs,
"controllers": controllers,
}

used_states = set()
Expand All @@ -215,30 +278,33 @@ def template_model_from_pysd_model(pysd_model, expression_map) -> TemplateModel:
for template_id, (transition_name, transition) in enumerate(
transition_map.items()
):
input_concepts = []
output_concepts = []
input_concepts, input_names = [], []
output_concepts, output_names = [], []
controller_concepts = []
input_name, output_name, controller_name = None, None, None
if transition.get("input"):
input_name = transition.get("input")

for input_name in transition.get("inputs"):
input_concepts.append(concepts[input_name].copy(deep=True))
if transition.get("output"):
output_name = transition.get("output")
input_names.append(input_name)

for output_name in transition.get("outputs"):
output_concepts.append(concepts[output_name].copy(deep=True))
if transition.get("controller"):
controller_name = transition.get("controller")
output_names.append(output_name)

for controller_name in transition.get("controllers"):
controller_concepts.append(
concepts[controller_name].copy(deep=True)
)

used_states |= {input_name, output_name}
used_states.update(input_names, output_names)

templates_.extend(
transition_to_templates(
input_concepts=input_concepts,
output_concepts=output_concepts,
controller_concepts=controller_concepts,
transition_rate=transition["expression"],
transition_rate=transition["expression"]
if transition["expression"] != SYMPY_FLOW_RATE_PLACEHOLDER
else None,
transition_id=str(template_id + 1),
transition_name=transition_name,
)
Expand Down Expand Up @@ -293,6 +359,8 @@ def preprocess_text(expr_text):
# strip leading and trailing white spaces
# remove spaces between operators and operands
# replace space between two words that makeup a variable name with "_"'
if not expr_text:
return expr_text
expr_text = (
expr_text.strip()
.replace(" * ", "*")
Expand Down
Loading

0 comments on commit e133987

Please sign in to comment.