Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GeoMechanicsApplication] Addressed several recently introduced code smells #11546

Merged
merged 1 commit into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
namespace Kratos
{

int ConstitutiveLawUtilities::GetStateVariableIndex(const Variable<double> &rThisVariable) {
int ConstitutiveLawUtilities::GetStateVariableIndex(const Variable<double>& rThisVariable)
{
int index = -1;

const std::string prefix{"STATE_VARIABLE_"};
if (rThisVariable.Name().substr(0, prefix.length()) == prefix) {
if (const std::string prefix{"STATE_VARIABLE_"};
rThisVariable.Name().substr(0, prefix.length()) == prefix) {
index = std::stoi(rThisVariable.Name().substr(prefix.length()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class GeoFlowApplyConstantScalarValueProcess : public Kratos::ApplyConstantScala
const Kratos::Variable<double>& rVariable,
double DoubleValue,
std::size_t MeshId,
const Flags Options)
: Kratos::ApplyConstantScalarValueProcess(rModelPart, rVariable, DoubleValue, MeshId, Options)
const Flags& rOptions)
: Kratos::ApplyConstantScalarValueProcess(rModelPart, rVariable, DoubleValue, MeshId, rOptions)
{}

bool hasWaterPressure()
Expand All @@ -49,9 +49,9 @@ class GeoFlowApplyConstantScalarValueProcess : public Kratos::ApplyConstantScala
class GeoFlowApplyConstantHydrostaticPressureProcess : public Kratos::ApplyConstantHydrostaticPressureProcess
{
public:
GeoFlowApplyConstantHydrostaticPressureProcess(Kratos::ModelPart& rModelPart,
Kratos::Parameters Settings)
: Kratos::ApplyConstantHydrostaticPressureProcess(rModelPart, Settings)
GeoFlowApplyConstantHydrostaticPressureProcess(Kratos::ModelPart& rModelPart,
const Kratos::Parameters& rSettings)
: Kratos::ApplyConstantHydrostaticPressureProcess(rModelPart, rSettings)
{}

Kratos::ModelPart &GetModelPart()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def _ConstructScheme(self, scheme_type, solution_type):
KratosMultiphysics.Logger.PrintInfo("GeoMechanics_U_Pw_Solver, scheme", "Quasi-Damped.")
scheme = KratosGeo.NewmarkQuasistaticDampedUPwScheme(beta,gamma,theta)
else:
raise Exception("Undefined solution type", solution_type)
raise RuntimeError(f"Undefined solution type '{solution_type}'")
elif (scheme_type.lower() == "backward_euler"or scheme_type.lower() == "backward-euler"):
if (solution_type.lower() == "quasi-static" or solution_type.lower() == "quasi_static"):
KratosMultiphysics.Logger.PrintInfo("GeoMechanics_U_Pw_Solver, scheme", "Backward Euler.")
Expand All @@ -210,9 +210,9 @@ def _ConstructScheme(self, scheme_type, solution_type):
KratosMultiphysics.Logger.PrintInfo("GeoMechanics_U_Pw_Solver, scheme", "Backward Euler.")
scheme = KratosGeo.BackwardEulerQuasistaticUPwScheme()
else:
raise Exception("Undefined/incompatible solution type with Backward Euler", solution_type)
raise RuntimeError(f"Undefined/incompatible solution type with Backward Euler: '{solution_type}'")
else:
raise Exception("Apart from Newmark, other scheme_type are not available.")
raise RuntimeError("Apart from Newmark, other scheme_type are not available.")

return scheme

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def _ExecuteCheckAndPrepare(self):
if materials_imported:
KratosMultiphysics.Logger.PrintInfo("::[GeoMechanicalSolver]:: ", "Constitutive law was successfully imported.")
else:
raise Exception("::[GeoMechanicalSolver]:: ", "Constitutive law was not imported.")
raise RuntimeError("::[GeoMechanicalSolver]:: Constitutive law was not imported.")

def _SetBufferSize(self):
required_buffer_size = max( self.settings["buffer_size"].GetInt(), self.GetMinimumBufferSize())
Expand All @@ -423,14 +423,14 @@ def _FillBuffer(self):
delta_time = self.main_model_part.ProcessInfo[KratosMultiphysics.DELTA_TIME]
step = self.main_model_part.ProcessInfo[KratosMultiphysics.STEP]

step = step - (buffer_size-1)*1
step -= (buffer_size - 1)
self.main_model_part.ProcessInfo.SetValue(KratosMultiphysics.STEP, step)
time = time - (buffer_size-1)*delta_time
time -= ((buffer_size - 1) * delta_time)
self.main_model_part.ProcessInfo.SetValue(KratosMultiphysics.TIME, time)
for i in range(buffer_size-1):
step = step + 1
for _ in range(buffer_size - 1):
step += 1
self.main_model_part.ProcessInfo.SetValue(KratosMultiphysics.STEP, step)
time = time + delta_time
time += delta_time
self.main_model_part.CloneTimeStep(time)

def _ConstructLinearSolver(self):
Expand Down Expand Up @@ -539,6 +539,6 @@ def _ConstructSolver(self, builder_and_solver, strategy_type):
move_mesh_flag)

else:
raise Exception("Undefined strategy type", strategy_type)
raise RuntimeError(f"Undefined strategy type '{strategy_type}'")

return solving_strategy
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ def test_side_pressure_boundaries(self):
# run simulation
simulation = test_helper.run_kratos(file_path)
# read results
result_file_path = os.path.join(file_path, "test_phreatic.post.res")
simulation_output = test_helper.read_numerical_results_from_post_res(result_file_path)
reader = test_helper.GiDOutputFileReader()
simulation_output = reader.read_output_from(os.path.join(file_path, "test_phreatic.post.res"))
# compare results
self.parameters()
self.compare_effective_stresses(simulation_output, simulation)
Expand Down
203 changes: 114 additions & 89 deletions applications/GeoMechanicsApplication/tests/test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,92 +414,117 @@ def find_closest_index_greater_than_value(input_list, value):
return None


def read_numerical_results_from_post_res(project_path):
"""
Reads the numerical results from a Kratos "post.res" output file
:param project_path: path to the result file
:return: Puts the nodal and gauss point results into a dictionary sorted by result name for every time step
"""
output_data = {"GaussPoints": {},
"results": {}}
with open(project_path, "r") as result_file:
gauss_points_name = None
result_name = None
result_type = None
current_block_name = None
result_location = None
current_integration_point = 0
for line in result_file:
line = line.strip()
if line.startswith("GaussPoints"):
current_block_name = "GaussPoints"
words = line.split()
gauss_points_name = words[1][1:-1] # strip off enclosing double quotes
output_data["GaussPoints"][gauss_points_name] = {}
continue

if line == "End GaussPoints":
current_block_name = None
gauss_points_name = None
continue

if line.startswith("Result"):
words = line.split()
result_name = words[1][1:-1] # strip off enclosing double quotes
if result_name not in output_data["results"]:
output_data["results"][result_name] = []
result_type = words[4]
result_location = words[5]
this_result = {"time": float(words[3]),
"location": result_location,
"values": []}
output_data["results"][result_name].append(this_result)
if result_location == "OnGaussPoints":
current_integration_point = 0
gauss_points_name = words[6][1:-1] # strip off enclosing double quotes
continue

if line == "Values":
current_block_name = "Values"
continue

if line == "End Values":
current_block_name = None
continue

if current_block_name == "GaussPoints":
if line.startswith("Number Of Gauss Points:"):
pos = line.index(":")
num_gauss_points = int(line[pos+1:].strip())
output_data["GaussPoints"][gauss_points_name]["size"] = num_gauss_points
continue

if current_block_name == "Values":
words = line.split()

if result_location == "OnNodes":
value = {"node": int(words[0])}
if result_type == "Scalar":
value["value"] = float(words[1])
elif result_type == "Vector":
value["value"] = [float(x) for x in words[1:]]

output_data["results"][result_name][-1]["values"].append(value)
elif result_location == "OnGaussPoints":
current_integration_point %= output_data["GaussPoints"][gauss_points_name]["size"]
current_integration_point += 1
if current_integration_point == 1:
value = {"element": int(words[0]),
"value": []}
output_data["results"][result_name][-1]["values"].append(value)
words.pop(0)

value = output_data["results"][result_name][-1]["values"][-1]["value"]
if result_type == "Scalar":
value.append(float(words[0]))
elif result_type == "Matrix":
value.append([float(v) for v in words])

continue
return output_data
class GiDOutputFileReader:
def __init__(self):
self._reset_internal_state()

def read_output_from(self, gid_output_file_path):
self._reset_internal_state()

with open(gid_output_file_path, "r") as result_file:
for line in result_file:
line = line.strip()
if line.startswith("GaussPoints"):
self._process_begin_of_gauss_points(line)
elif line == "End GaussPoints":
self._process_end_of_gauss_points(line)
elif line.startswith("Result"):
self._process_result_header(line)
elif line == "Values":
self._process_begin_of_block(line)
elif line == "End Values":
self._process_end_of_block(line)
elif self.current_block_name == "GaussPoints":
self._process_gauss_point_data(line)
elif self.current_block_name == "Values":
self._process_value_data(line)

return self.output_data

def _reset_internal_state(self):
self.output_data = {}
self.current_block_name = None
self.result_name = None
self.result_type = None
self.result_location = None
self.gauss_points_name = None
self.current_integration_point = 0

def _process_begin_of_gauss_points(self, line):
self._process_begin_of_block(line)
self.gauss_points_name = self._strip_off_quotes(line.split()[1])
if self.current_block_name not in self.output_data:
self.output_data[self.current_block_name] = {}
self.output_data[self.current_block_name][self.gauss_points_name] = {}

def _process_end_of_gauss_points(self, line):
self._process_end_of_block(line)
self.gauss_points_name = None

def _process_result_header(self, line):
if "results" not in self.output_data:
self.output_data["results"] = {}
words = line.split()
self.result_name = self._strip_off_quotes(words[1])
if self.result_name not in self.output_data["results"]:
self.output_data["results"][self.result_name] = []
self.result_type = words[4]
self.result_location = words[5]
this_result = {"time": float(words[3]),
"location": self.result_location,
"values": []}
self.output_data["results"][self.result_name].append(this_result)
if self.result_location == "OnGaussPoints":
self.current_integration_point = 0
self.gauss_points_name = self._strip_off_quotes(words[6])

def _process_gauss_point_data(self, line):
if line.startswith("Number Of Gauss Points:"):
pos = line.index(":")
num_gauss_points = int(line[pos+1:].strip())
self.output_data[self.current_block_name][self.gauss_points_name]["size"] = num_gauss_points

def _process_value_data(self, line):
words = line.split()
if self.result_location == "OnNodes":
self._process_nodal_result(words)
elif self.result_location == "OnGaussPoints":
self._process_gauss_point_result(words)

def _process_nodal_result(self, words):
value = {"node": int(words[0])}
if self.result_type == "Scalar":
value["value"] = float(words[1])
elif self.result_type == "Vector":
value["value"] = [float(x) for x in words[1:]]
self.output_data["results"][self.result_name][-1]["values"].append(value)

def _process_gauss_point_result(self, words):
self.current_integration_point %= self.output_data["GaussPoints"][self.gauss_points_name]["size"]
self.current_integration_point += 1
if self.current_integration_point == 1:
value = {"element": int(words[0]),
"value": []}
self.output_data["results"][self.result_name][-1]["values"].append(value)
words.pop(0)

value = self.output_data["results"][self.result_name][-1]["values"][-1]["value"]
if self.result_type == "Scalar":
value.append(float(words[0]))
elif self.result_type == "Matrix":
value.append([float(x) for x in words])

def _process_begin_of_block(self, line):
assert(self.current_block_name is None) # nested blocks are not supported
self.current_block_name = line.split()[0]

def _process_end_of_block(self, line):
words = line.split()
assert(words[0] == "End")
assert(self.current_block_name == words[1])
self.current_block_name = None

def _strip_off_quotes(self, quoted_string):
assert(quoted_string[0] == '"')
assert(quoted_string[-1] == '"')
return quoted_string[1:-1]