Skip to content

Commit

Permalink
fix(api): check worker model requirements on job processing (#4062)
Browse files Browse the repository at this point in the history
while computing the new job `addJobsToQueue`, the function `processNodeJobRunRequirements` now checks:
* the group of the worker model againt the `execGroups` computed on the node (close #4002)
* the binaries requirements against the worker model binary capabilitites

In case of error, the job is failed
  • Loading branch information
fsamin authored and bnjjj committed Mar 25, 2019
1 parent 901e836 commit dce008c
Show file tree
Hide file tree
Showing 6 changed files with 543 additions and 506 deletions.
24 changes: 12 additions & 12 deletions engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ func addJobsToQueue(ctx context.Context, db gorp.SqlExecutor, stage *sdk.Stage,
spawnErrs.Join(*err)
}

_, next = observability.Span(ctx, "workflow.getNodeJobRunRequirements")
jobRequirements, containsService, modelType, err := getNodeJobRunRequirements(db, *job, run)
_, next = observability.Span(ctx, "workflow.processNodeJobRunRequirements")
jobRequirements, containsService, modelType, err := processNodeJobRunRequirements(db, *job, run, sdk.GroupsToIDs(groups))
next()
if err != nil {
spawnErrs.Join(*err)
Expand Down Expand Up @@ -419,22 +419,22 @@ func addJobsToQueue(ctx context.Context, db gorp.SqlExecutor, stage *sdk.Stage,
skippedOrDisabledJobs++
}

// If there is any error in the previous operation, mark the job as failed
if !spawnErrs.IsEmpty() {
failedJobs++
wjob.Status = sdk.StatusFail.String()
spawnInfos := sdk.SpawnMsg{
ID: sdk.MsgSpawnInfoJobError.ID,
}

for _, e := range spawnErrs {
spawnInfos.Args = append(spawnInfos.Args, sdk.Cause(e).Error())
msg := sdk.SpawnMsg{
ID: sdk.MsgSpawnInfoJobError.ID,
}
msg.Args = []interface{}{sdk.Cause(e).Error()}
wjob.SpawnInfos = append(wjob.SpawnInfos, sdk.SpawnInfo{
APITime: time.Now(),
Message: msg,
RemoteTime: time.Now(),
})
}

wjob.SpawnInfos = []sdk.SpawnInfo{sdk.SpawnInfo{
APITime: time.Now(),
Message: spawnInfos,
RemoteTime: time.Now(),
}}
} else {
wjob.SpawnInfos = []sdk.SpawnInfo{sdk.SpawnInfo{
APITime: time.Now(),
Expand Down
53 changes: 42 additions & 11 deletions engine/api/workflow/process_requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@ import (
"github.com/ovh/cds/sdk/log"
)

// getNodeJobRunRequirements returns requirements list interpolated, and true or false if at least
// processNodeJobRunRequirements returns requirements list interpolated, and true or false if at least
// one requirement is of type "Service"
func getNodeJobRunRequirements(db gorp.SqlExecutor, j sdk.Job, run *sdk.WorkflowNodeRun) (sdk.RequirementList, bool, string, *sdk.MultiError) {
requirements := sdk.RequirementList{}
tmp := map[string]string{}
errm := &sdk.MultiError{}

func processNodeJobRunRequirements(db gorp.SqlExecutor, j sdk.Job, run *sdk.WorkflowNodeRun, execsGroupIDs []int64) (sdk.RequirementList, bool, string, *sdk.MultiError) {
var requirements sdk.RequirementList
var errm sdk.MultiError
var containsService bool
var model string
for _, v := range run.BuildParameters {
tmp[v.Name] = v.Value
}
var tmp = sdk.ParametersToMap(run.BuildParameters)

for _, v := range j.Action.Requirements {
name, errName := interpolate.Do(v.Name, tmp)
Expand All @@ -36,29 +32,64 @@ func getNodeJobRunRequirements(db gorp.SqlExecutor, j sdk.Job, run *sdk.Workflow
errm.Append(errValue)
continue
}
sdk.AddRequirement(&requirements, v.ID, name, v.Type, value)

if v.Type == sdk.ServiceRequirement {
containsService = true
}
if v.Type == sdk.ModelRequirement {
// It is forbidden to have more than one model requirement.
if model != "" {
errm.Append(sdk.ErrInvalidJobRequirementDuplicateModel)
break
}
model = value
}

sdk.AddRequirement(&requirements, v.ID, name, v.Type, value)
}

var modelType string
if model != "" {
// Load the worker model
wm, err := worker.LoadWorkerModelByName(db, model)
if err != nil {
log.Error("getNodeJobRunRequirements> error while getting worker model %s: %v", model, err)
errm.Append(sdk.ErrNoWorkerModel)
} else {
// Check that the worker model is in an exec group
if !sdk.IsInInt64Array(wm.GroupID, execsGroupIDs) {
errm.Append(sdk.ErrInvalidJobRequirementWorkerModelPermission)
}

// Check that the worker model has the binaries capabilitites
// only if the worker model doesn't need registration
if !wm.NeedRegistration && !wm.CheckRegistration {
for _, req := range requirements {
if req.Type == sdk.BinaryRequirement {
var hasCapa bool
for _, cap := range wm.RegisteredCapabilities {
if cap.Value == req.Value {
hasCapa = true
break
}
}
if !hasCapa {
errm.Append(sdk.ErrInvalidJobRequirementWorkerModelCapabilitites)
break
}
}
}
}

modelType = wm.Type
}

}

if errm.IsEmpty() {
return requirements, containsService, modelType, nil
}
return requirements, containsService, modelType, errm
return requirements, containsService, modelType, &errm
}

func prepareRequirementsToNodeJobRunParameters(reqs sdk.RequirementList) []sdk.Parameter {
Expand Down
Loading

0 comments on commit dce008c

Please sign in to comment.