Skip to content

Commit

Permalink
Merge pull request #10081 from KratosMultiphysics/eFlowsReleaseM20
Browse files Browse the repository at this point in the history
Adding features for release of eFlows4HPC European Project
  • Loading branch information
SADPR authored Jul 28, 2022
2 parents 55d962d + fb0e67b commit 286b39d
Show file tree
Hide file tree
Showing 7 changed files with 533 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import dislib as ds
from dislib.data.array import Array

def load_blocks_array(blocks, shape, block_size):
if shape[0] < block_size[0] or shape[1] < block_size[1]:
raise ValueError("The block size is greater than the ds-array")
return Array(blocks, shape=shape, top_left_shape=block_size,
reg_shape=block_size, sparse=False)

def load_blocks_rechunk(blocks, shape, block_size, new_block_size):
if shape[0] < new_block_size[0] or shape[1] < new_block_size[1]:
raise ValueError("The block size requested for rechunk"
"is greater than the ds-array")
final_blocks = [[]]
# Este bucle lo puse por si los Future objects se guardan en una lista, en caso de que la forma de guardarlos cambie, también cambiará un poco este bucle.
# Si blocks se pasa ya como (p. ej) [[Future_object, Future_object]] no hace falta.
for block in blocks:
final_blocks[0].append(block)
arr = load_blocks_array(final_blocks, shape, block_size)
return arr.rechunk(new_block_size)
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,36 @@
import KratosMultiphysics.FluidDynamicsApplication
import KratosMultiphysics.RomApplication as romapp
from KratosMultiphysics.RomApplication import python_solvers_wrapper_rom as solver_wrapper
from KratosMultiphysics.RomApplication.empirical_cubature_method import EmpiricalCubatureMethod
from KratosMultiphysics.FluidDynamicsApplication.fluid_dynamics_analysis import FluidDynamicsAnalysis

import json
import numpy as np

class FluidDynamicsAnalysisROM(FluidDynamicsAnalysis):

def __init__(self,model,project_parameters):
def __init__(self,model,project_parameters, path = '.', hyper_reduction_element_selector = None):
KratosMultiphysics.Logger.PrintWarning('\x1b[1;31m[DEPRECATED CLASS] \x1b[0m',"\'FluidDynamicsAnalysisROM\'", "class is deprecated. Use the \'RomAnalysis\' one instead.")
self.path = path
super().__init__(model,project_parameters)
if hyper_reduction_element_selector != None :
if hyper_reduction_element_selector == "EmpiricalCubature":
self.hyper_reduction_element_selector = EmpiricalCubatureMethod()
self.time_step_residual_matrix_container = []
else:
err_msg = "The requested element selection method \"" + hyper_reduction_element_selector + "\" is not in the rom application\n"
err_msg += "Available options are: \"EmpiricalCubature\""
raise Exception(err_msg)
else:
self.hyper_reduction_element_selector = None



#### Internal functions ####
def _CreateSolver(self):
""" Create the Solver (and create and import the ModelPart if it is not alread in the model) """
## Solver construction
with open('RomParameters.json') as rom_parameters:
with open(self.path +'/RomParameters.json') as rom_parameters:
rom_settings = KratosMultiphysics.Parameters(rom_parameters.read())
self.project_parameters["solver_settings"].AddValue("rom_settings", rom_settings["rom_settings"])
return solver_wrapper.CreateSolverByParameters(self.model, self.project_parameters["solver_settings"],self.project_parameters["problem_data"]["parallel_type"].GetString())
Expand All @@ -28,7 +43,7 @@ def ModifyAfterSolverInitialize(self):
"""Here is where the ROM_BASIS is imposed to each node"""
super().ModifyAfterSolverInitialize()
computing_model_part = self._solver.GetComputingModelPart()
with open('RomParameters.json') as f:
with open(self.path + '/RomParameters.json') as f:
data = json.load(f)
nodal_dofs = len(data["rom_settings"]["nodal_unknowns"])
nodal_modes = data["nodal_modes"]
Expand All @@ -42,3 +57,31 @@ def ModifyAfterSolverInitialize(self):
aux[j,i] = nodal_modes[Counter][j][i]
node.SetValue(romapp.ROM_BASIS, aux ) # ROM basis
counter+=1
if self.hyper_reduction_element_selector != None:
if self.hyper_reduction_element_selector.Name == "EmpiricalCubature":
self.ResidualUtilityObject = romapp.RomResidualsUtility(self._GetSolver().GetComputingModelPart(), self.project_parameters["solver_settings"]["rom_settings"], self._GetSolver()._GetScheme())


def FinalizeSolutionStep(self):
if self.hyper_reduction_element_selector != None:
if self.hyper_reduction_element_selector.Name == "EmpiricalCubature":
print('\n\n\n\nGenerating matrix of residuals')
ResMat = self.ResidualUtilityObject.GetResiduals()
NP_ResMat = np.array(ResMat, copy=False)
self.time_step_residual_matrix_container.append(NP_ResMat)
super().FinalizeSolutionStep()

def Finalize(self):
super().Finalize()
if self.hyper_reduction_element_selector != None:
if self.hyper_reduction_element_selector.Name == "EmpiricalCubature":
self.residuals_snapshots = self._ObtainBasis()

def _ObtainBasis(self):
### Building the Snapshot matrix ####
for i in range (len(self.time_step_residual_matrix_container)):
if i == 0:
SnapshotMatrix = self.time_step_residual_matrix_container[i]
else:
SnapshotMatrix = np.c_[SnapshotMatrix, self.time_step_residual_matrix_container[i]]
return SnapshotMatrix
88 changes: 88 additions & 0 deletions applications/RomApplication/python_scripts/parallel_svd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import dislib as ds
import numpy as np
from pycompss.api.task import task
from pycompss.api.constraint import constraint
from pycompss.api.api import compss_wait_on
from pycompss.api.parameter import Type, COLLECTION_IN, Depth
from dislib.data.array import Array

from KratosMultiphysics.RomApplication.auxiliary_functions_workflow import load_blocks_array, load_blocks_rechunk


@constraint(computingUnits=2)
@task(Y_blocks={Type: COLLECTION_IN, Depth: 2}, returns=2)
def my_qr(Y_blocks):
Y = np.block(Y_blocks)
Q,R = np.linalg.qr(Y, mode='reduced')
return Q,R


@constraint(computingUnits=1)
@task(B_blocks={Type: COLLECTION_IN, Depth: 2}, returns=2)
def my_svd(B_blocks):
B = np.block(B_blocks)
U_hat, s, _ = np.linalg.svd(B, full_matrices=False)
return U_hat, s




def rsvd(A,desired_rank,oversampling=10,row_splits=10,column_splits=1):

#-----Dimensions--------
k = desired_rank
p = k + oversampling
n = A.shape[0]
m = A.shape[1]
A_row_chunk_size = int( n / row_splits)
A_column_chunk_size = int( m / column_splits)
A = A.rechunk((A_row_chunk_size,A_column_chunk_size))
#-----Matrix Omega Initialization--------
omega_column_chunk_size = p
omega_row_chunk_size = A_column_chunk_size
Omega = ds.random_array(shape=(m, p), block_size=(omega_row_chunk_size, omega_column_chunk_size) ) # Create a random projection matrix Omega of size mxp, for this test, p is of 110.
#----------------------------------

# STEP 1 (DISTRIBUTED): Sample the column space of A. Y results into an nxp matrix.
Y = A @ Omega

# STEP 2 (SERIAL): Serial QR, to be done in distributed. Q results into a matrix of nxp
Q,_ = my_qr(Y._blocks)
Q=load_blocks_rechunk([Q], shape = (n, p), block_size= (n, p), new_block_size=(A_row_chunk_size, omega_column_chunk_size))

#"SERIAL STEPS FINISH"
B = Q.T @ A
# STEP 3 (DISTRIBUTED): Project A into the orthonormal basis. B results into a matrix of pxm.

#""""SERIAL STEPS START"""
U_hat, s = my_svd(B._blocks)
U_hat = load_blocks_rechunk([U_hat], shape = (p, m), block_size = (p, m), new_block_size=(omega_column_chunk_size, omega_column_chunk_size))

#""""SERIAL STEPS FINISH"""
U_hat = U_hat[:,:k] #U_hat results into a matrix of pxk.
U = Q @ U_hat #STEP 5 (DISTRIBUTED): Project the reduced basis into Q. U results into a matrix of nxk.
U = U.collect() #STEP 6 (collecting the required basis):
s = compss_wait_on(s)
s = s[:k]
return U, s

def truncated_svd(Matrix, epsilon=0):
Matrix = compss_wait_on(Matrix)
M,N=np.shape(Matrix)
dimMATRIX = max(M,N)
U, s, V = np.linalg.svd(Matrix, full_matrices=False) #U --> M xN, V --> N x N
V = V.T
tol = dimMATRIX*np.finfo(float).eps*max(s)/2
R = np.sum(s > tol) # Definition of numerical rank
if epsilon == 0:
K = R
else:
SingVsq = np.multiply(s,s)
SingVsq.sort()
normEf2 = np.sqrt(np.cumsum(SingVsq))
epsilon = epsilon*normEf2[-1] #relative tolerance
T = (sum(normEf2<epsilon))
K = len(s)-T
K = min(R,K)
return U[:, :K], s[:K], V[:, :K]

Loading

0 comments on commit 286b39d

Please sign in to comment.