Skip to content

Commit

Permalink
Merge branch 'optapp/NLOPT-integration' into optapp/new-shape-control
Browse files Browse the repository at this point in the history
  • Loading branch information
RezaNajian committed Jul 4, 2023
2 parents 1b03e03 + 8ef4366 commit 8e1274d
Show file tree
Hide file tree
Showing 13 changed files with 5,152 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
try:
import nlopt
except ImportError:
raise Exception("NLOPT python library is not available")

import KratosMultiphysics as Kratos
from KratosMultiphysics.OptimizationApplication.utilities.optimization_problem import OptimizationProblem
from KratosMultiphysics.OptimizationApplication.algorithms.standardized_NLOPT_objective import StandardizedNLOPTObjective
from KratosMultiphysics.OptimizationApplication.algorithms.standardized_NLOPT_constraint import StandardizedNLOPTConstraint
from KratosMultiphysics.OptimizationApplication.controls.master_control import MasterControl
from KratosMultiphysics.OptimizationApplication.algorithms.algorithm import Algorithm
from KratosMultiphysics.OptimizationApplication.utilities.component_data_view import ComponentDataView
from KratosMultiphysics.OptimizationApplication.utilities.logger_utilities import DictLogger
from KratosMultiphysics.OptimizationApplication.utilities.helper_utilities import CallOnAll


def Factory(model: Kratos.Model, parameters: Kratos.Parameters, optimization_problem: OptimizationProblem):
return NLOPTAlgorithms(model, parameters, optimization_problem)

class NLOPTAlgorithms(Algorithm):
"""
A classical steepest descent algorithm to solve unconstrainted optimization problems.
"""

@classmethod
def GetDefaultParameters(cls):
return Kratos.Parameters("""{
"module" : "KratosMultiphysics.OptimizationApplication.algorithms",
"type" : "PLEASE_PROVIDE_AN_ALGORITHM_CLASS_NAME",
"objective" : {},
"constraints" : [],
"controls" : [],
"echo_level" : 0,
"NLOPT_settings" : {
"algorithm_name" : "MMA",
"stopping_criteria" : {
"objective_rel_tol": 0,
"objective_abs_tol": 0,
"controls_rel_tol": 0,
"controls_abs_tol": 0,
"maximum_function_evalualtion": 10
}
}
}""")

def __init__(self, model:Kratos.Model, parameters: Kratos.Parameters, optimization_problem: OptimizationProblem):
self.model = model
self.parameters = parameters
self._optimization_problem = optimization_problem

parameters.ValidateAndAssignDefaults(self.GetDefaultParameters())

# controls
self.master_control = MasterControl()
for control_name in parameters["controls"].GetStringArray():
control = optimization_problem.GetControl(control_name)
self.master_control.AddControl(control)
self.__control_field = None

# objective & constraints
self.__objective = StandardizedNLOPTObjective(parameters["objective"], self.master_control, self._optimization_problem)
self.__constraints = []
for constraint_settings in parameters["constraints"]:
self.__constraints.append(StandardizedNLOPTConstraint(constraint_settings, self.master_control, self._optimization_problem))

# nlopt settings
NLOPT_settings = parameters["NLOPT_settings"]
NLOPT_settings.ValidateAndAssignDefaults(self.GetDefaultParameters()["NLOPT_settings"])

# nlopt algorithm
self.algorithm_name = NLOPT_settings["algorithm_name"].GetString()
if(len(self.__constraints)==0):
self.CheckOptimizerSupport(self.algorithm_name,None)
else:
for constraint in self.__constraints:
if constraint.IsEqualityType():
self.CheckOptimizerSupport(self.algorithm_name,"equality")
else:
self.CheckOptimizerSupport(self.algorithm_name,"inequality")

# stopping
self.stopping_criteria = NLOPT_settings["stopping_criteria"]
self.stopping_criteria.ValidateAndAssignDefaults(self.GetDefaultParameters()["NLOPT_settings"]["stopping_criteria"])

def GetMinimumBufferSize(self) -> int:
return 2

def Check(self):
pass

def Initialize(self):
self.converged = False
self.__objective.Initialize()
self.__objective.Check()
for constraint in self.__constraints:
constraint.Initialize()
constraint.Check()
self.master_control.Initialize()
self.__control_field = self.master_control.GetControlField()
self.algorithm_data = ComponentDataView("algorithm", self._optimization_problem)

# create nlopt optimizer
self.x0 = self.__control_field.Evaluate()
self.nlopt_optimizer = nlopt.opt(self.GetOptimizer(self.algorithm_name), self.x0.size)

# assign objectives and constarints
self.nlopt_optimizer.set_min_objective(self.__objective.CalculateStandardizedValueAndGradients)
for constraint in self.__constraints:
if constraint.IsEqualityType():
self.nlopt_optimizer.add_equality_constraint(lambda x,grad: constraint.CalculateStandardizedValueAndGradients(x,grad),1e-8)
else:
self.nlopt_optimizer.add_inequality_constraint(lambda x,grad: constraint.CalculateStandardizedValueAndGradients(x,grad),1e-8)
# now set stopping criteria
self.nlopt_optimizer.set_ftol_rel(self.stopping_criteria["objective_rel_tol"].GetDouble())
self.nlopt_optimizer.set_ftol_abs(self.stopping_criteria["objective_abs_tol"].GetDouble())
self.nlopt_optimizer.set_xtol_rel(self.stopping_criteria["controls_rel_tol"].GetDouble())
self.nlopt_optimizer.set_xtol_abs(self.stopping_criteria["controls_abs_tol"].GetDouble())
self.nlopt_optimizer.set_maxeval(self.stopping_criteria["maximum_function_evalualtion"].GetInt())

def Finalize(self):
self.__objective.Finalize()
for constraint in self.__constraints:
constraint.Finalize()
self.master_control.Finalize()

def Solve(self):
self.nlopt_optimizer.optimize(self.x0)
CallOnAll(self._optimization_problem.GetListOfProcesses("output_processes"), Kratos.OutputProcess.PrintOutput)
self.LogTermination()

def LogTermination(self):
stopval = self.nlopt_optimizer.get_stopval()
maxtime = self.nlopt_optimizer.get_maxtime()
nlopt_result_map = {
nlopt.SUCCESS: 'Generic success return value.',
nlopt.STOPVAL_REACHED: f'Optimization stopped because stopval ({stopval}) was reached.',
nlopt.FTOL_REACHED: f'Optimization stopped because ftol_rel ({self.stopping_criteria["objective_rel_tol"].GetDouble()}) or ftol_abs ({self.stopping_criteria["objective_abs_tol"].GetDouble()}) was reached.',
nlopt.XTOL_REACHED: f'Optimization stopped because xtol_rel ({self.stopping_criteria["controls_rel_tol"].GetDouble()}) or xtol_abs ({self.stopping_criteria["controls_abs_tol"].GetDouble()}) was reached.',
nlopt.MAXEVAL_REACHED: f'Optimization stopped because maxeval ({self.stopping_criteria["maximum_function_evalualtion"].GetInt()}) was reached.',
nlopt.MAXTIME_REACHED: f'Optimization stopped because maxtime ({maxtime}) was reached.',
nlopt.FAILURE: 'Generic failure return value.',
nlopt.INVALID_ARGS: 'Invalid arguments (e.g. lower bounds are bigger than upper bounds, an unknown algorithm was specified, etcetera).',
nlopt.OUT_OF_MEMORY: 'Ran out of memory.',
nlopt.ROUNDOFF_LIMITED: 'Halted because roundoff errors limited progress. (In this case, the optimization still typically returns a useful result.)',
nlopt.FORCED_STOP: 'Halted because of a forced termination: the user called nlopt_force_stop(opt) on the optimization’s nlopt_opt object opt from the user’s objective function or constraints.'
}
info = {"Termination":nlopt_result_map[self.nlopt_optimizer.last_optimize_result()]}
DictLogger(f"NLOPT-{self.algorithm_name}",info)

def CheckOptimizerSupport(self, algorithm_name, constraint_type):
# Unconstrained gradient-based optimizers
unconstrained_optimizers = {
"LBFGS": nlopt.LD_LBFGS
}

# Constrained gradient-based optimizers with equality constraints
equality_constrained_optimizers = {
"SLSQP": nlopt.LD_SLSQP
}

# Constrained gradient-based optimizers with inequality constraints
inequality_constrained_optimizers = {
"MMA": nlopt.LD_MMA,
"SLSQP": nlopt.LD_SLSQP
}

if constraint_type == "equality":
if algorithm_name not in equality_constrained_optimizers.keys():
raise ValueError(f"The algorithm {algorithm_name} does not support equality constraints, support {equality_constrained_optimizers.keys()}.")
elif constraint_type == "inequality":
if algorithm_name not in inequality_constrained_optimizers.keys():
raise ValueError(f"The algorithm {algorithm_name} does not support inequality constraints, support {inequality_constrained_optimizers.keys()}.")
else:
if algorithm_name not in unconstrained_optimizers.keys():
raise ValueError(f"The algorithm {algorithm_name} does not support unconstrained optimization, support {unconstrained_optimizers.keys()}.")
return True

def GetOptimizer(self,name):
nlopt_algorithm_mapping = {
"MMA": nlopt.LD_MMA,
"SLSQP": nlopt.LD_SLSQP,
"LBFGS": nlopt.LD_LBFGS
}
return nlopt_algorithm_mapping[name]
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import KratosMultiphysics as Kratos
import KratosMultiphysics.OptimizationApplication as KratosOA
from KratosMultiphysics.OptimizationApplication.utilities.optimization_problem import OptimizationProblem
from KratosMultiphysics.OptimizationApplication.responses.response_routine import ResponseRoutine
from KratosMultiphysics.OptimizationApplication.controls.master_control import MasterControl
from KratosMultiphysics.OptimizationApplication.utilities.component_data_view import ComponentDataView
from KratosMultiphysics.OptimizationApplication.utilities.logger_utilities import DictLogger
from KratosMultiphysics.OptimizationApplication.utilities.logger_utilities import TimeLogger
from KratosMultiphysics.OptimizationApplication.utilities.helper_utilities import CallOnAll
import numpy
import sys
import time as timer
import datetime

class StandardizedNLOPTConstraint(ResponseRoutine):
"""Standardized constraint response function
This class creates instances to standardize any response function for the specified type of the contraint.
Supported contraint types:
"=",
"<",
">"
The reference value for the constraint either can be the "initial_value" or a specified value.
"""
def __init__(self, parameters: Kratos.Parameters, master_control: MasterControl, optimization_problem: OptimizationProblem, required_buffer_size: int = 2):
default_parameters = Kratos.Parameters("""{
"response_name" : "",
"type" : "",
"ref_value": "initial_value"
}""")

if parameters.Has("ref_value") and parameters["ref_value"].IsDouble():
default_parameters["ref_value"].SetDouble(0.0)

parameters.ValidateAndAssignDefaults(default_parameters)

response = optimization_problem.GetResponse(parameters["response_name"].GetString())

super().__init__(master_control, response)

if required_buffer_size < 2:
raise RuntimeError(f"Standardized constraint requires 2 as minimum buffer size. [ response name = {self.GetReponse().GetName()} ]")

component_data_view = ComponentDataView(response, optimization_problem)
component_data_view.SetDataBuffer(required_buffer_size)

self.__optimization_problem = optimization_problem
self.__buffered_data = component_data_view.GetBufferedData()
self.__unbuffered_data = component_data_view.GetUnBufferedData()

if parameters["ref_value"].IsDouble():
self.__ref_type = "specified_value"
self.__reference_value = parameters["ref_value"].GetDouble()
elif parameters["ref_value"].IsString() and parameters["ref_value"].GetString() == "initial_value":
self.__ref_type = "initial_value"
self.__reference_value = None
else:
raise RuntimeError(f"Provided \"reference_type\" = {self.__ref_type} is not supported for constraint response functions. Followings are supported options: \n\tinitial_value\n\tfloat value")

self.__constraint_type = parameters["type"].GetString()
if self.__constraint_type in ["=", "<"]:
self.__scaling = 1.0
elif self.__constraint_type in [">"]:
self.__scaling = -1.0
if not self.__reference_value is None: self.__reference_value *= -1
else:
raise RuntimeError(f"Provided \"type\" = {self.__constraint_type} is not supported in constraint response functions. Followings are supported options: \n\t=\n\t<\n\t>")

self.__zero_threshold = sys.float_info.epsilon

def IsEqualityType(self) -> str:
return self.__constraint_type == "="

def GetReferenceValue(self) -> float:
if self.__reference_value is not None:
return self.__reference_value
else:
if self.__unbuffered_data.HasValue("initial_value"):
self.__reference_value = self.__unbuffered_data["initial_value"]
return self.__reference_value
else:
raise RuntimeError(f"Response value for {self.GetReponse().GetName()} is not calculated yet.")

def Initialize(self):
super().Initialize()

def CalculateStandardizedValueAndGradients(self, control_field: numpy.ndarray, gradient_field: numpy.ndarray, save_value: bool = True) -> float:

# update the master control
update_status = self.GetMasterControl().Update(control_field)
master_control_updated = False
for is_updated in update_status.values():
if is_updated:
master_control_updated = True
break
# output
if master_control_updated:
CallOnAll(self.__optimization_problem.GetListOfProcesses("output_processes"), Kratos.OutputProcess.PrintOutput)
self.LogOptimizationStep()
self.__optimization_problem.AdvanceStep()

with TimeLogger(f"StandardizedNLOPTConstraint::Calculate {self.GetReponse().GetName()} value", None, "Finished"):
self.response_value = self.CalculateValue()
if not self.__unbuffered_data.HasValue("initial_value"):
self.__unbuffered_data["initial_value"] = self.response_value

if save_value:
if self.__buffered_data.HasValue("value"): del self.__buffered_data["value"]
self.__buffered_data["value"] = self.response_value

DictLogger("Constraint info",self.GetInfo())

ref_value = self.GetReferenceValue()
standardized_response_value = self.response_value - ref_value
standardization_factor = self.__scaling
if abs(ref_value) > self.__zero_threshold:
standardization_factor /= abs(ref_value)
standardized_response_value *= standardization_factor

if gradient_field.size > 0:
gradient_collective_expression = self.CalculateGradient()
gradient_field[:] = gradient_collective_expression.Evaluate() * standardization_factor

if save_value:
# save the physical gradients for post processing in unbuffered data container.
for physical_var, physical_gradient in self.GetRequiredPhysicalGradients().items():
variable_name = f"d{self.GetReponse().GetName()}_d{physical_var.Name()}"
for physical_gradient_expression in physical_gradient.GetContainerExpressions():
if self.__unbuffered_data.HasValue(variable_name): del self.__unbuffered_data[variable_name]
# cloning is a cheap operation, it only moves underlying pointers
# does not create additional memory.
self.__unbuffered_data[variable_name] = physical_gradient_expression.Clone()

# save the filtered gradients for post processing in unbuffered data container.
for gradient_container_expression, control in zip(gradient_collective_expression.GetContainerExpressions(), self.GetMasterControl().GetListOfControls()):
variable_name = f"d{self.GetReponse().GetName()}_d{control.GetName()}"
if self.__unbuffered_data.HasValue(variable_name): del self.__unbuffered_data[variable_name]
# cloning is a cheap operation, it only moves underlying pointers
# does not create additional memory.
self.__unbuffered_data[variable_name] = gradient_container_expression.Clone()

return standardized_response_value

def GetValue(self, step_index: int = 0) -> float:
return self.response_value

def GetAbsoluteViolation(self, step_index: int = 0) -> float:
ref_value = self.GetReferenceValue()
standardized_response_value = self.response_value - ref_value
standardization_factor = self.__scaling
if abs(ref_value) > self.__zero_threshold:
standardization_factor /= abs(ref_value)
standardization_factor *= 100
standardized_response_value *= standardization_factor
return standardized_response_value

def GetInfo(self) -> dict:
info = {
"name": self.GetReponse().GetName(),
"value": self.GetValue(),
"type": self.__constraint_type,
"ref_value": self.GetReferenceValue(),
"violation [%]": self.GetAbsoluteViolation()
}
return info

def LogOptimizationStep(self):

now = datetime.datetime.now()
now_str = now.strftime("%Y-%m-%d %H:%M:%S")
iteration_text = f" EoF Iteration {self.__optimization_problem.GetStep()}"
iteration_output = f"{'#'} {iteration_text} [Time: {now_str}] {'#'}"

divided_line = len(iteration_output) * '#'

to_print = f"{divided_line}\n{iteration_output}\n{divided_line}\n"

Kratos.Logger.PrintInfo(to_print)
Loading

0 comments on commit 8e1274d

Please sign in to comment.