Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cleaning Pipelines] MINOR improvements in allocation of resources #1576

Merged
merged 1 commit into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 83 additions & 59 deletions scripts/builtin/bandit.dml
Original file line number Diff line number Diff line change
Expand Up @@ -53,46 +53,55 @@

m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, Matrix[Double] X_test, Matrix[Double] Y_test, List[Unknown] metaList,
String evaluationFunc, Matrix[Double] evalFunHp, Frame[Unknown] lp, Frame[Unknown] primitives, Frame[Unknown] param, Integer k = 3,
Integer R=50, Double baseLineScore, Boolean cv, Integer cvk = 2, Double ref = 0, Boolean enablePruning = FALSE, Boolean verbose = TRUE, String output="")
Integer R=50, Double baseLineScore, Boolean cv, Integer cvk = 2, Double ref = 0, Integer seed = -1, Boolean enablePruning = FALSE, Boolean verbose = TRUE, String output="")
return(Boolean perf)
# return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams, Matrix[Double] bestAccuracy, Frame[String] feaFrameOuter)
{
print("Starting optimizer")
totalPruneCount = 0
FLAG_VARIABLE = 5
pipelines_executed = 0
HYPERPARAM_LENGTH = (ncol(lp) * FLAG_VARIABLE * 3) + 1 ## num of col in logical * 5 meat flag vars * max hyperparam per op + 1 accuracy col
HYPERPARAM_LENGTH = ((ncol(lp) + 2) * FLAG_VARIABLE * 3) + 1 ## num of col in logical * 5 meat flag vars * max hyperparam per op + 1 accuracy col
bestPipeline = frame("", rows=1, cols=1)
bestHyperparams = as.matrix(0)
bestAccuracy = as.matrix(0)
# initialize bandit variables
# variable names follow publication where algorithm is introduced
eta = 2 # the halving ratio is fixed to 2
s_max = floor(log(R,eta)) - 1;
s_max = floor(log(R,eta));
# # compute weights for R and then increase/decrease R with respect to importance of configurations

weight = matrix(1/s_max , rows=s_max, cols=1)
weight = cumsum(weight)
# weight = matrix(1, rows=s_max, cols=1)
# print("weight matrix: "+toString(weight))
# initialize output variables
hparam = matrix(0, rows=k*(s_max+1), cols=HYPERPARAM_LENGTH)
pipeline = frame(0, rows=k*(s_max+1), cols=ncol(lp)+1)
endIdx = matrix(k, rows=(s_max+1), cols=1)
hparam = matrix(0, rows=k*(s_max), cols=HYPERPARAM_LENGTH)
pipeline = matrix(0, rows=k*(s_max), cols=4)
endIdx = matrix(k, rows=(s_max), cols=1)
endIdx = cumsum(endIdx)
startIdx = (endIdx - k) + 1

n = ifelse(s_max >= nrow(lp), nrow(lp), n = ceil(nrow(lp)/(s_max + 1));)

for(s in s_max:0) { # TODO convert to parfor
n = ifelse(s_max >= nrow(lp), nrow(lp), n = ceil(nrow(lp)/s_max);)
pipelineId = as.frame(seq(1, nrow(lp)))
lp = cbind(pipelineId, lp)
mainLookup = lp
B = (s_max + 1) * R;
s_max = s_max - 1
idx = 1
for(s in s_max:0, check = 0) { # TODO convert to parfor

# result variables
bracket_hp = matrix(0, rows=k*(s+1)+k, cols=HYPERPARAM_LENGTH)
bracket_pipel = matrix(0, rows=k*(s+1)+k, cols=3)
bracket_pipel = matrix(0, rows=k*(s+1)+k, cols=4)
start=1; end=0;

# # compute the number of initial pipelines n
r = R * eta^(-s);
r = max(R * as.scalar(weight[((s_max - s) + 1)]) * eta^(-s), 1);
configurations = lp[1:(min(n, nrow(lp)))]

# append configuration keys for extracting the pipeline later on
id = seq(1, nrow(configurations))
configurations = cbind(as.frame(id), configurations)
# save the original configuration as a lookup table
lookup = configurations

for(i in 0:s) {
# successive halving
Expand All @@ -107,11 +116,13 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, Matrix[Doubl
configurations = configurations[1:n_i, ]
pipelines_executed = pipelines_executed + (n_i * r_i)
[outPip,outHp, pruneCount] = run_with_hyperparam(ph_pip=configurations, r_i=r_i, X=X_train, Y=Y_train, Xtest=X_test, Ytest=Y_test, metaList=metaList,
evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, param=param, cv=cv, cvk=cvk, ref=ref, enablePruning=enablePruning)
evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, param=param, cv=cv, cvk=cvk, ref=ref, seed = seed, enablePruning=enablePruning)
totalPruneCount = totalPruneCount + pruneCount
# sort the pipelines by order of accuracy decreasing
a = order(target = outPip, by = 1, decreasing=TRUE, index.return=FALSE)
b = order(target = outHp, by = 1, decreasing=TRUE, index.return=FALSE)
IX = order(target = outPip, by = 1, decreasing=TRUE, index.return=TRUE)
P = table(seq(1,nrow(IX)), IX, nrow(IX), nrow(outPip));
a = P %*% outPip
b = P %*% outHp
rowIndex = min(k, nrow(a))

# maintain the brackets results
Expand All @@ -122,26 +133,27 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, Matrix[Doubl

# sort the configurations for successive halving
avergae_perf = getMaxPerConf(outPip, nrow(configurations))
sortMask = matrix(1, rows=1, cols=ncol(configurations))
configurations = frameSort(cbind(avergae_perf, configurations), cbind(as.matrix(0), sortMask), TRUE)
sortMask = matrix(1, rows=1, cols=ncol(configurations) + 1)
sortMask[1,1] = 0
configurations = frameSort(cbind(avergae_perf, configurations), sortMask, TRUE)
configurations = configurations[, 2:ncol(configurations)]
}
if(n < nrow(lp))
lp = lp[n+1:nrow(lp),]
bracket_pipel = removeEmpty(target=bracket_pipel, margin="rows")
bracket_hp = removeEmpty(target=bracket_hp, margin="rows")
# keep the best k results for each bracket
[bracket_bestPipeline, bracket_bestHyperparams] = extractBracketWinners(bracket_pipel, bracket_hp, k, lookup)
[bracket_bestPipeline, bracket_bestHyperparams] = extractBracketWinners(bracket_pipel, bracket_hp, k)
# optimize by the features
startOut = as.scalar(startIdx[s+1])
endOut = min(as.scalar(endIdx[s+1]), (startOut + nrow(bracket_bestPipeline) - 1))
startOut = as.scalar(startIdx[idx])
endOut = min(as.scalar(endIdx[idx]), (startOut + nrow(bracket_bestPipeline) - 1))
pipeline[startOut:endOut, ] = bracket_bestPipeline
hparam[startOut:endOut, 1:ncol(bracket_bestHyperparams)] = bracket_bestHyperparams
idx = idx + 1
# print("bracket best: \n"+toString(bracket_bestPipeline))
}
[bestPipeline, bestHyperparams] = extractTopK(pipeline, hparam, baseLineScore, k)
bestAccuracy = as.matrix(bestPipeline[,1])
bestHyperparams = bestHyperparams[,2:ncol(bestHyperparams)]
bestPipeline = bestPipeline[, 2:ncol(bestPipeline)]
[bestPipeline, bestHyperparams, bestAccuracy] = extractTopK(pipeline, hparam, baseLineScore, k, mainLookup)

imp = as.double(as.scalar(bestAccuracy[1, 1])) - as.double(baseLineScore)
perf = imp > 0
applyFunc = bestPipeline
Expand Down Expand Up @@ -212,31 +224,32 @@ get_physical_configurations = function(Frame[String] logical, Scalar[int] numCon
}

# this method will call the execute pipelines with their hyper-parameters
run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double] X, Matrix[Double] Y,
run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i = 1, Matrix[Double] X, Matrix[Double] Y,
Matrix[Double] Xtest, Matrix[Double] Ytest, List[Unknown] metaList, String evaluationFunc, Matrix[Double] evalFunHp,
Frame[Unknown] param, Boolean cv, Integer cvk = 2, Double ref = 0, Boolean enablePruning = FALSE, Boolean default = FALSE)
Frame[Unknown] param, Boolean cv = FALSE, Integer cvk = 2, Double ref = 0, Integer seed = -1, Boolean enablePruning = FALSE, Boolean default = FALSE)
return (Matrix[Double] output_operator, Matrix[Double] output_hyperparam, Integer pruneCount, Matrix[Double] changesByPipMatrix)
{
# # # TODO there is a partial overlap but it is negligible so we will not rewrite the scripts but lineage based reuse will get rid of it
changesByPipMatrix = matrix(0, rows=nrow(ph_pip) * r_i, cols=1)
pruneCount = 0
output_hp = matrix(0, nrow(ph_pip)*r_i, (ncol(ph_pip)-1) * 5 * 3)
output_hp = matrix(0, nrow(ph_pip)*r_i, (ncol(ph_pip)) * 5 * 3)
output_accuracy = matrix(0, nrow(ph_pip)*r_i, 1)
output_pipelines = matrix(0, nrow(ph_pip)*r_i, 2)
output_pipelines = matrix(0, nrow(ph_pip)*r_i, 3)
# rows in validation set
clone_X = X
clone_Y = Y
clone_Xtest = Xtest
clone_Ytest = Ytest
index = 1
id = as.matrix(ph_pip[, 1])
ph_pip = ph_pip[, 2:ncol(ph_pip)]
ids = as.matrix(ph_pip[, 1:2])
ph_pip = ph_pip[, 3:ncol(ph_pip)]

parfor(i in 1:nrow(ph_pip), check = 0)
{
# execute configurations with r resources
op = removeEmpty(target=ph_pip[i], margin="cols")
print("PIPELINE EXECUTION START ... "+toString(op))
[hp, applyFunctions, no_of_res, no_of_flag_vars] = getHyperparam(op, param, r_i, default, enablePruning)
# print("PIPELINE EXECUTION START ... "+toString(op))
[hp, applyFunctions, no_of_res, no_of_flag_vars] = getHyperparam(op, param, r_i, default, seed, enablePruning)
hpForPruning = matrix(0, rows=1, cols=ncol(op))
changesByOp = matrix(0, rows=1, cols=ncol(op))
metaList2 = metaList; #ensure metaList is no result var
Expand Down Expand Up @@ -279,13 +292,12 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
}

# evalFunOutput = eval(evaluationFunc, argList)
accT = floor((time() - t1) / 1e+6)
matrix_width = as.matrix(nrow(hp_matrix) * ncol(hp_matrix))
hp_vec = cbind(matrix_width, matrix(hp_matrix, rows=1, cols=nrow(hp_matrix)*ncol(hp_matrix), byrow=TRUE))
index = (i - 1) * no_of_res + r
output_accuracy[index, 1] = accuracy
output_hp[index, 1:ncol(hp_vec)] = hp_vec
output_pipelines[index, ] = cbind(as.matrix(index), id[i,1])
output_pipelines[index, ] = cbind(as.matrix(index), ids[i,1:2])
}
else
{
Expand All @@ -303,10 +315,9 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
}

# extract the hyper-parameters for pipelines
getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown] hpList, Integer no_of_res, Boolean default, Boolean enablePruning)
getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown] hpList, Integer no_of_res, Boolean default, Integer seed = -1, Boolean enablePruning)
return (Matrix[Double] paramMatrix, Frame[Unknown] applyFunc, Integer no_of_res, Integer NUM_META_FLAGS)
{

allParam = 0;
NUM_META_FLAGS = 5
NUM_DEFAULT_VALUES = 4
Expand All @@ -330,7 +341,7 @@ getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown] hpList, Intege
# this matrix stores no. of hps, values of hps, and flags
paramMatrix = matrix(0, rows=ncol(pipeline)*no_of_res, cols=max(paramCount)+NUM_META_FLAGS+1)

for(i in 1:ncol(pipeline)) {
parfor(i in 1:ncol(pipeline), check=0) {
op = as.scalar(pipeline[1, i])
index = as.scalar(indexes[i])
no_of_param = as.integer(as.scalar(paramCount[i]))
Expand All @@ -354,21 +365,21 @@ getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown] hpList, Intege
minVal = as.scalar(hpList[index, paramValIndex])
maxVal = as.scalar(hpList[index, paramValIndex + 1])
if(type == "FP") {
val = rand(rows=no_of_res, cols=1, min=minVal, max=maxVal, pdf="uniform");
val = rand(rows=no_of_res, cols=1, min=minVal, max=maxVal, pdf="uniform", seed=seed);
OpParam[, j] = val;
}
else if(type == "INT") {
if(as.integer(maxVal) > no_of_res)
val = sample(as.integer(maxVal), no_of_res, FALSE)
val = sample(as.integer(maxVal), no_of_res, FALSE, seed)
else
val = sample(as.integer(maxVal), no_of_res, TRUE)
val = sample(as.integer(maxVal), no_of_res, TRUE, seed)
less_than_min = val < as.integer(minVal);
val = (less_than_min * minVal) + val;
OpParam[, j] = val;
}
else if(type == "BOOL") {
if(maxVal == 1) {
s = sample(2, no_of_res, TRUE);
s = sample(2, no_of_res, TRUE, seed);
b = s - 1;
OpParam[, j] = b;
}
Expand Down Expand Up @@ -400,25 +411,43 @@ getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown] hpList, Intege


# extract the top k pipelines as a final result after deduplication and sorting
extractTopK = function(Frame[Unknown] pipeline, Matrix[Double] hyperparam,
Double baseLineScore, Integer k)
return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams)
extractTopK = function(Matrix[Double] pipeline, Matrix[Double] hyperparam,
Double baseLineScore, Integer k, Frame[Unknown] mainLookup)
return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams, Matrix[Double] bestAccuracy)
{
hyperparam = order(target = hyperparam, by = 1, decreasing=TRUE, index.return=FALSE)
pipeline = frameSort(pipeline, cbind(as.matrix(0), matrix(1, rows=1, cols=ncol(pipeline) - 1)), TRUE)
IX = order(target = hyperparam, by = 1, decreasing=TRUE, index.return=TRUE)
P = table(seq(1,nrow(IX)), IX, nrow(IX), nrow(hyperparam));
hyperparam = P %*% hyperparam
pipeline = P %*% pipeline

# remove the row with accuracy less than test accuracy
mask = (hyperparam[, 1] < baseLineScore) == 0
if(sum(mask) == 0)
mask[1, 1] = 1
hyperparam = removeEmpty(target = hyperparam, margin = "rows", select = mask)
pipeline = removeEmpty(target = pipeline, margin = "rows", select = mask)

rowIndex = min(nrow(hyperparam), k)
# select the top k
bestPipeline = pipeline[1:rowIndex,]
bestHyperparams = hyperparam[1:rowIndex,]
bestAccuracy = pipeline[1:rowIndex, 1]
bestHyperparams = hyperparam[1:rowIndex, 2:ncol(hyperparam)]
pipeline = pipeline[1:rowIndex]
# # # lookup for the pipelines
pipCode = pipeline[, ncol(pipeline)]

bestPipeline = frame(data="0", rows=nrow(pipeline), cols=ncol(mainLookup))
parfor(i in 1: nrow(pipeline)) {
index = as.scalar(pipCode[i])
bestPipeline[i] = mainLookup[index]
}

bestPipeline = bestPipeline[, 2:ncol(bestPipeline)]

}

# extract the top k pipelines for each bracket, the intermediate results
extractBracketWinners = function(Matrix[Double] pipeline, Matrix[Double] hyperparam,
Integer k, Frame[String] conf)
return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams)
extractBracketWinners = function(Matrix[Double] pipeline, Matrix[Double] hyperparam, Integer k)
return (Matrix[Double] bestPipeline, Matrix[Double] bestHyperparams)
{
# bestPipeline = frameSort(bestPipeline)
hyperparam = order(target = hyperparam, by = 1, decreasing=TRUE, index.return=FALSE)
Expand All @@ -427,12 +456,7 @@ extractBracketWinners = function(Matrix[Double] pipeline, Matrix[Double] hyperpa

pipeline = pipeline[1:rowIndex,]
bestHyperparams = hyperparam[1:rowIndex,]
bestPipeline = frame(data="0", rows=nrow(pipeline), cols=ncol(conf))
parfor(i in 1: nrow(pipeline)) {
index = as.scalar(pipeline[i, 3])
bestPipeline[i] = conf[index]
bestPipeline[i, 1] = as.frame(pipeline[i, 1])
}
bestPipeline = pipeline[1:rowIndex]
}

###########################################################################
Expand Down
4 changes: 2 additions & 2 deletions scripts/builtin/executePipeline.dml
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ SMOTE = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Integ
return (Matrix[Double] X, Matrix[Double] Y)
{
# get the class count
for(k in 1:max(Y)) {
classes = table(Y, 1)
classes = table(Y, 1)
for(k in 1:nrow(classes) - 1) {
minClass = min(classes)
maxClass = max(classes)
diff = (maxClass - minClass)/sum(classes)
Expand Down
7 changes: 6 additions & 1 deletion scripts/builtin/msvm.dml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ m_msvm = function(Matrix[Double] X, Matrix[Double] Y, Boolean intercept = FALSE,
stop("MSVM: Invalid Y input, containing negative values")
if(verbose)
print("Running Multiclass-SVM")

# Robustness for datasets with missing values (causing NaN gradients)
numNaNs = sum(isNaN(X))
if( numNaNs > 0 ) {
print("msvm: matrix X contains "+numNaNs+" missing values, replacing with 0.")
X = replace(target=X, pattern=NaN, replacement=0);
}
num_rows_in_w = ncol(X)
if(intercept) {
# append once, and call l2svm always with intercept=FALSE
Expand Down
6 changes: 6 additions & 0 deletions scripts/builtin/msvmPredict.dml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@
m_msvmPredict = function(Matrix[Double] X, Matrix[Double] W)
return(Matrix[Double] YRaw, Matrix[Double] Y)
{
# Robustness for datasets with missing values
numNaNs = sum(isNaN(X))
if( numNaNs > 0 ) {
print("msvm: matrix X contains "+numNaNs+" missing values, replacing with 0.")
X = replace(target=X, pattern=NaN, replacement=0);
}
if(ncol(X) != nrow(W)){
if(ncol(X) + 1 != nrow(W)){
stop("MSVM Predict: Invalid shape of W ["+ncol(W)+","+nrow(W)+"] or X ["+ncol(X)+","+nrow(X)+"]")
Expand Down
12 changes: 6 additions & 6 deletions scripts/builtin/topk_cleaning.dml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ source("scripts/pipelines/scripts/enumerateLogical.dml") as lg;
source("scripts/builtin/bandit.dml") as bandit;

s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = as.frame("NULL"), Frame[Unknown] metaData = as.frame("NULL"), Frame[Unknown] primitives,
Frame[Unknown] parameters, String evaluationFunc, Matrix[Double] evalFunHp, Integer topK = 5, Integer resource_val = 20, Integer max_iter = 10, Double sample = 1.0,
Double expectedIncrease=1.0, Integer seed = -1, Boolean cv=TRUE, Integer cvk = 2, Boolean isLastLabel = TRUE, Boolean correctTypos=FALSE, Boolean enablePruning = FALSE, String output)
Frame[Unknown] parameters, Frame[String] refSol = as.frame("NaN"), String evaluationFunc, Matrix[Double] evalFunHp, Integer topK = 5, Integer resource_val = 20, Integer max_iter = 10,
Double sample = 1.0, Double expectedIncrease=1.0, Integer seed = -1, Boolean cv=TRUE, Integer cvk = 2, Boolean isLastLabel = TRUE, Boolean correctTypos=FALSE, Boolean enablePruning = FALSE,
String output)
return(Boolean perf)
# return (Frame[Unknown] topKPipelines, Matrix[Double] topKHyperParams, Matrix[Double] topKScores, Frame[Unknown] bestLogical,
# Frame[Unknown] features, Double dirtyScore, Matrix[Double] evalFunHp)
Expand Down Expand Up @@ -138,18 +139,17 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
metaList['distY'] = dist

print("-- Cleaning - Enum Logical Pipelines: ");
[bestLogical, con, refChanges] = lg::enumerateLogical(X=eXtrain, y=eYtrain, Xtest=eXtest, ytest=eYtest,
initial_population=logical, seed = seed, max_iter=max_iter, metaList = metaList,
[bestLogical, con, refChanges, acc] = lg::enumerateLogical(X=eXtrain, y=eYtrain, Xtest=eXtest, ytest=eYtest,
initial_population=logical, refSol=refSol, seed = seed, max_iter=max_iter, metaList = metaList,
evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, primitives=primitives, param=parameters,
dirtyScore = (dirtyScore + expectedIncrease), cv=cv, cvk=cvk, verbose=TRUE, ctx=ctx)
t6 = time(); print("---- finalized in: "+(t6-t5)/1e9+"s");

topKPipelines = as.frame("NULL"); topKHyperParams = matrix(0,0,0); topKScores = matrix(0,0,0); features = as.frame("NULL")

# # [topKPipelines, topKHyperParams, topKScores, features] =
perf = bandit(X_train=eXtrain, Y_train=eYtrain, X_test=eXtest, Y_test=eYtest, metaList=metaList,
evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, lp=bestLogical, primitives=primitives, param=parameters, baseLineScore=dirtyScore,
k=topK, R=resource_val, cv=cv, cvk=cvk, ref=refChanges, enablePruning = enablePruning, output=output, verbose=TRUE);
k=topK, R=resource_val, cv=cv, cvk=cvk, ref=refChanges, seed=seed, enablePruning = enablePruning, output=output, verbose=TRUE);
t7 = time(); print("-- Cleaning - Enum Physical Pipelines: "+(t7-t6)/1e9+"s");
}

Expand Down
Loading