Skip to content

Commit

Permalink
Merge pull request #277 from nanglo123/fix_stella_expression_bug
Browse files Browse the repository at this point in the history
Fix expression creation for Stella
  • Loading branch information
bgyori authored Feb 5, 2024
2 parents 0033d04 + 1360b9d commit bfc4432
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 66 deletions.
2 changes: 1 addition & 1 deletion mira/modeling/amr/stockflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def __init__(self, model: Model):
flow_dict = {"id": fid}
flow_dict['name'] = flow.template.display_name
flow_dict['upstream_stock'] = flow.consumed[0].concept.name
flow_dict['downstream_stock'] = flow.produced[0].concept.name
flow_dict['downstream_stock'] = flow.produced[0].concept.name if flow.produced else None

if flow.template.rate_law:
rate_law = flow.template.rate_law.args[0]
Expand Down
113 changes: 80 additions & 33 deletions mira/sources/system_dynamics/stella.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,13 @@
template_model_from_pysd_model,
)

EXPRESSION_PER_LEVEL_MAP = {}


def template_model_from_stella_model_file(fname) -> TemplateModel:
"""Return a template model from a local Stella model file.
Parameters
----------
fname : Union[str,PosixPath]
fname : str or pathlib.Path
The path to the local Stella model file
Returns
Expand Down Expand Up @@ -108,48 +106,68 @@ def extract_stella_variable_expressions(stella_model_file):
Mapping of variable name to string variable expression
"""
expression_map = {}
operand_operator_per_level_var_map = {}

for component in stella_model_file.sections[0].components:
if isinstance(component, ControlElement):
continue
# test to see if flow first as flow is a subclass or Aux
elif isinstance(component, Flow):
operands = component.components[0][1].arguments
operators = component.components[0][1].operators
EXPRESSION_PER_LEVEL_MAP[component.name] = {}
operand_operator_per_level_var_map[component.name] = {}
extract_variables(
operands, operators, component.name
operands,
operators,
component.name,
operand_operator_per_level_var_map,
)
elif isinstance(component, Aux):
expression_map[component.name] = str(component.components[0][1])

elif isinstance(component, Stock):
try:
EXPRESSION_PER_LEVEL_MAP[component.name] = {}
operand_operator_per_level_var_map[component.name] = {}
if component.name == "recovered":
pass
operands = component.components[0][1].flow.arguments
operators = component.components[0][1].flow.operators
extract_variables(
operands, operators, component.name
operands,
operators,
component.name,
operand_operator_per_level_var_map,
)
# If the stock only has a reference and no operators in its expression
except AttributeError:
expression_map[component.name] = component.components[0][
1
].flow.reference
EXPRESSION_PER_LEVEL_MAP[component.name] = {}
EXPRESSION_PER_LEVEL_MAP[component.name][0] = {}
EXPRESSION_PER_LEVEL_MAP[component.name][0]["operands"] = [
component.components[0][1].flow.reference
]

for var_name, expr_level_dict in EXPRESSION_PER_LEVEL_MAP.items():
operand_operator_per_level_var_map[component.name] = {}
operand_operator_per_level_var_map[component.name][0] = {}
operand_operator_per_level_var_map[component.name][0][
"operands"
] = [component.components[0][1].flow.reference]

# construct the expression for each variable once its operators and operands are mapped
for var_name, expr_level_dict in operand_operator_per_level_var_map.items():
expression_map[var_name] = create_expression(expr_level_dict)
return expression_map


def create_expression(expr_level_dict):
"""When a variable's operators and operands are mapped, construct the string expression.
Parameters
----------
expr_level_dict : dict[int,Any]
The mapping of level to operands and operators present in a level of an expression
Returns
-------
: str
The string expression
"""
str_expression = ""
for level, ops in expr_level_dict.items():
for level, ops in reversed(expr_level_dict.items()):
operands = ops["operands"]
operators = ops["operators"] if ops.get("operators") else []
if not operators:
Expand All @@ -160,9 +178,11 @@ def create_expression(expr_level_dict):
level_expr = "("
for idx, operand in enumerate(operands):
try:
# usually operand is a reference structure
if operators[idx] != "negative":
level_expr += operand + operators[idx]
if level == len(expr_level_dict) - 1:
level_expr += operand + operators[idx]
else:
level_expr += operators[idx] + operand
else:
level_expr += "-" + operand
except IndexError:
Expand All @@ -172,14 +192,19 @@ def create_expression(expr_level_dict):
level_expr = ""
for idx, operand in enumerate(operands):
if operators[idx] != "negative":
level_expr = operand + operators[idx]
if level == len(expr_level_dict) - 1:
level_expr = operand + operators[idx]
else:
level_expr = operators[idx] + operand
else:
level_expr += "-" + operand
str_expression += level_expr
return str_expression


def extract_variables(operands, operators, name):
def extract_variables(
operands, operators, name, operand_operator_per_level_var_map
):
"""Helper method to construct an expression for each variable in a Stella model
Parameters
Expand All @@ -190,31 +215,53 @@ def extract_variables(operands, operators, name):
List of operators in an expression for a variable
name : str
Name of the variable
operand_operator_per_level_var_map : dict[str,Any]
Mapping of variable name to operators and operands associated with the level they are
encountered
"""
EXPRESSION_PER_LEVEL_MAP[name][0] = {}
EXPRESSION_PER_LEVEL_MAP[name][0]["operators"] = []
EXPRESSION_PER_LEVEL_MAP[name][0]["operands"] = []
EXPRESSION_PER_LEVEL_MAP[name][0]["operators"].extend(operators)
operand_operator_per_level_var_map[name][0] = {}
operand_operator_per_level_var_map[name][0]["operators"] = []
operand_operator_per_level_var_map[name][0]["operands"] = []
operand_operator_per_level_var_map[name][0]["operators"].extend(operators)
for idx, operand in enumerate(operands):
parse_structures(operand, 0, name)
parse_structures(operand, 0, name, operand_operator_per_level_var_map)


def parse_structures(operand, idx, name):
if EXPRESSION_PER_LEVEL_MAP[name].get(idx) is None:
EXPRESSION_PER_LEVEL_MAP[name][idx] = {}
EXPRESSION_PER_LEVEL_MAP[name][idx]["operators"] = []
EXPRESSION_PER_LEVEL_MAP[name][idx]["operands"] = []
def parse_structures(operand, idx, name, operand_operator_per_level_var_map):
"""Recursive helper method that retrieves each operand associated with a ReferenceStructure
object and operators associated with an ArithmeticStructure object. ArithmeticStructures can
contain ArithmeticStructure or ReferenceStructure Objects.
Parameters
----------
operand : ReferenceStructure or ArithmeticStructure
The operand in an expression
idx : int
The level at which the operand is encountered in an expression (e.g. 5+(7-3). The
operands 7 and 3 are considered as level 1 and 5 is considered as level 5).
name : str
The name of the variable
operand_operator_per_level_var_map : dict[str,Any]
Mapping of variable name to operators and operands associated with the level they are
encountered
"""
if operand_operator_per_level_var_map[name].get(idx) is None:
operand_operator_per_level_var_map[name][idx] = {}
operand_operator_per_level_var_map[name][idx]["operators"] = []
operand_operator_per_level_var_map[name][idx]["operands"] = []

# base case
if isinstance(operand, ReferenceStructure):
EXPRESSION_PER_LEVEL_MAP[name][idx]["operands"].append(
operand_operator_per_level_var_map[name][idx]["operands"].append(
operand.reference
)
return
elif isinstance(operand, ArithmeticStructure):
for struct in operand.arguments:
parse_structures(struct, idx + 1, name)
EXPRESSION_PER_LEVEL_MAP[name][idx + 1]["operators"].extend(
parse_structures(
struct, idx + 1, name, operand_operator_per_level_var_map
)
operand_operator_per_level_var_map[name][idx + 1]["operators"].extend(
operand.operators
)
2 changes: 1 addition & 1 deletion mira/sources/system_dynamics/vensim.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def template_model_from_mdl_file(fname) -> TemplateModel:
Parameters
----------
fname : Union[str,PosixPath]
fname : str or pathlib.Path
The path to the local Vensim file
Returns
Expand Down
Loading

0 comments on commit bfc4432

Please sign in to comment.