Skip to content

Commit

Permalink
fix: cr
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt committed Dec 29, 2021
1 parent d91e173 commit 6094e20
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 113 deletions.
6 changes: 3 additions & 3 deletions engine/api/workflow/dao_node_job_run_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ func LoadNodeRunJobInfo(ctx context.Context, db gorp.SqlExecutor, nodeRunID int6
// a temporary data, as workflow_node_job_run table. After the end of the Job,
// swpawninfos values will be in WorfklowRun table in stages column
func insertNodeRunJobInfo(db gorp.SqlExecutor, info *sdk.WorkflowNodeJobRunInfo) error {
spawnJSON, errJ := json.Marshal(info.SpawnInfos)
if errJ != nil {
return sdk.WrapError(errJ, "insertNodeRunJobInfo> cannot Marshal")
spawnJSON, err := json.Marshal(info.SpawnInfos)
if err != nil {
return sdk.WithStack(err)
}

query := "insert into workflow_node_run_job_info (workflow_node_run_id, workflow_node_run_job_id, spawninfos, created) values ($1, $2, $3, $4)"
Expand Down
10 changes: 7 additions & 3 deletions engine/api/workflow/execute_node_job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,16 @@ func PrepareSpawnInfos(infos []sdk.SpawnInfo) []sdk.SpawnInfo {
now := time.Now()
prepared := make([]sdk.SpawnInfo, 0)
for _, info := range infos {
prepared = append(prepared, sdk.SpawnInfo{
preparedInfo := sdk.SpawnInfo{
APITime: now,
RemoteTime: info.RemoteTime,
Message: info.Message,
UserMessage: info.Message.DefaultUserMessage(),
})
}
if preparedInfo.RemoteTime.IsZero() {
preparedInfo.RemoteTime = now
}
prepared = append(prepared, preparedInfo)
}
return prepared
}
Expand Down Expand Up @@ -275,7 +279,7 @@ func TakeNodeJobRun(ctx context.Context, db gorpmapper.SqlExecutorWithTx, store
return nil, nil, sdk.WrapError(err, "cannot update worker_id in node job run %d", jobID)
}

if err := AddSpawnInfosNodeJobRun(db, job.WorkflowNodeRunID, jobID, PrepareSpawnInfos(infos)); err != nil {
if err := AddSpawnInfosNodeJobRun(db, job.WorkflowNodeRunID, jobID, infos); err != nil {
return nil, nil, sdk.WrapError(err, "cannot save spawn info on node job run %d", jobID)
}

Expand Down
19 changes: 3 additions & 16 deletions engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,28 +555,15 @@ jobLoop:
if !spawnErrs.IsEmpty() {
failedJobs++
wjob.Status = sdk.StatusFail

for _, e := range spawnErrs {
log.ErrorWithStackTrace(ctx, e)

msg := sdk.SpawnMsg{
ID: sdk.MsgSpawnInfoJobError.ID,
}
msg.Args = []interface{}{sdk.ExtractHTTPError(e).Error()}
wjob.SpawnInfos = append(wjob.SpawnInfos, sdk.SpawnInfo{
APITime: time.Now(),
Message: msg,
RemoteTime: time.Now(),
UserMessage: msg.DefaultUserMessage(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobError.ID, Args: []interface{}{sdk.ExtractHTTPError(e).Error()}},
})
}
} else {
sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobInQueue.ID}
wjob.SpawnInfos = []sdk.SpawnInfo{{
APITime: time.Now(),
Message: sp,
RemoteTime: time.Now(),
UserMessage: sp.DefaultUserMessage(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobInQueue.ID},
}}
}

Expand All @@ -588,7 +575,7 @@ jobLoop:
}
next()

if err := AddSpawnInfosNodeJobRun(db, wjob.WorkflowNodeRunID, wjob.ID, PrepareSpawnInfos(wjob.SpawnInfos)); err != nil {
if err := AddSpawnInfosNodeJobRun(db, wjob.WorkflowNodeRunID, wjob.ID, wjob.SpawnInfos); err != nil {
return nil, sdk.WrapError(err, "cannot save spawn info job %d", wjob.ID)
}

Expand Down
24 changes: 6 additions & 18 deletions engine/api/workflow/run_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,32 +589,20 @@ queueRun:
})
require.NoError(t, err)

sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoHatcheryStarts.ID}
//AddSpawnInfosNodeJobRun
err := workflow.AddSpawnInfosNodeJobRun(db, j.WorkflowNodeRunID, j.ID, []sdk.SpawnInfo{
{
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
},
})
err := workflow.AddSpawnInfosNodeJobRun(db, j.WorkflowNodeRunID, j.ID, []sdk.SpawnInfo{{
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoHatcheryStarts.ID},
}})
assert.NoError(t, err)
if t.Failed() {
t.FailNow()
}

sp = sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTaken.ID}
//TakeNodeJobRun
takenJobID := j.ID
takenJob, _, _ := workflow.TakeNodeJobRun(context.TODO(), db, cache, *proj, takenJobID, "model", "worker", "1", []sdk.SpawnInfo{
{
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
},
}, "hatchery_name")
takenJob, _, _ := workflow.TakeNodeJobRun(context.TODO(), db, cache, *proj, takenJobID, "model", "worker", "1", []sdk.SpawnInfo{{
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTaken.ID},
}}, "hatchery_name")

//Load workflow node run
nodeRun, err := workflow.LoadNodeRunByID(context.Background(), db, takenJob.WorkflowNodeRunID, workflow.LoadRunOptions{})
Expand Down
91 changes: 48 additions & 43 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"strconv"
"time"

"github.com/go-gorp/gorp"
"github.com/gorilla/mux"
"github.com/ovh/venom"
"github.com/rockbears/log"
Expand All @@ -30,7 +29,6 @@ import (
"github.com/ovh/cds/engine/api/worker"
"github.com/ovh/cds/engine/api/workermodel"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/featureflipping"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/engine/service"
Expand Down Expand Up @@ -107,7 +105,7 @@ func (api *API) postTakeWorkflowJobHandler() service.Handler {
}

pbji := &sdk.WorkflowNodeJobRunData{}
report, err := takeJob(ctx, api.mustDB, api.Cache, p, id, workerModelName, pbji, wk, hatcheryName, api.Config.Secrets.SkipProjectSecretsOnRegion)
report, err := api.takeJob(ctx, p, id, workerModelName, pbji, wk, hatcheryName)
if err != nil {
return sdk.WrapError(err, "cannot takeJob nodeJobRunID:%d", id)
}
Expand Down Expand Up @@ -137,31 +135,24 @@ func (api *API) postTakeWorkflowJobHandler() service.Handler {
}
}

func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, p *sdk.Project, id int64, workerModel string, wnjri *sdk.WorkflowNodeJobRunData, wk *sdk.Worker, hatcheryName string, skipProjectSecretsOnRegion []string) (*workflow.ProcessorReport, error) {
tx, err := dbFunc().Begin()
func (api *API) takeJob(ctx context.Context, p *sdk.Project, id int64, workerModel string, wnjri *sdk.WorkflowNodeJobRunData, wk *sdk.Worker, hatcheryName string) (*workflow.ProcessorReport, error) {
tx, err := api.mustDB().Begin()
if err != nil {
return nil, sdk.WrapError(err, "cannot start transaction")
}
defer tx.Rollback() // nolint

// Prepare spawn infos
m1 := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTaken.ID, Args: []interface{}{fmt.Sprintf("%d", id), wk.Name}}
m2 := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTakenWorkerVersion.ID, Args: []interface{}{wk.Name, wk.Version, wk.OS, wk.Arch}}
infos := []sdk.SpawnInfo{
{
RemoteTime: getRemoteTime(ctx),
Message: m1,
UserMessage: m1.DefaultUserMessage(),
},
{
RemoteTime: getRemoteTime(ctx),
Message: m2,
UserMessage: m2.DefaultUserMessage(),
},
}
infos := []sdk.SpawnInfo{{
RemoteTime: getRemoteTime(ctx),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTaken.ID, Args: []interface{}{fmt.Sprintf("%d", id), wk.Name}},
}, {
RemoteTime: getRemoteTime(ctx),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTakenWorkerVersion.ID, Args: []interface{}{wk.Name, wk.Version, wk.OS, wk.Arch}},
}}

// Take node job run
job, report, err := workflow.TakeNodeJobRun(ctx, tx, store, *p, id, workerModel, wk.Name, wk.ID, infos, hatcheryName)
job, report, err := workflow.TakeNodeJobRun(ctx, tx, api.Cache, *p, id, workerModel, wk.Name, wk.ID, infos, hatcheryName)
if err != nil {
return nil, sdk.WrapError(err, "cannot take job %d", id)
}
Expand Down Expand Up @@ -213,10 +204,6 @@ func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store,
wnjri.WorkflowName = workflowRun.Workflow.Name
wnjri.NodeRunName = noderun.WorkflowNodeName

if err := tx.Commit(); err != nil {
return nil, sdk.WithStack(err)
}

secretsReqs := job.Job.Action.Requirements.FilterByType(sdk.SecretRequirement).Values()
secretsReqsRegs := make([]*regexp.Regexp, 0, len(secretsReqs))
for i := range secretsReqs {
Expand All @@ -228,7 +215,17 @@ func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store,
}

// Filter project's secrets depending of the region requirement that was set on job
skipProjectSecrets := job.Region != nil && sdk.IsInArray(*job.Region, skipProjectSecretsOnRegion)
skipProjectSecrets := job.Region != nil && sdk.IsInArray(*job.Region, api.Config.Secrets.SkipProjectSecretsOnRegion)
if skipProjectSecrets {
if err := workflow.AddSpawnInfosNodeJobRun(tx, job.WorkflowNodeRunID, job.ID, []sdk.SpawnInfo{{
RemoteTime: getRemoteTime(ctx),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoDisableSecretInjection.ID, Args: []interface{}{*job.Region}},
}}); err != nil {
return nil, sdk.WrapError(err, "cannot save spawn info job %d", job.ID)
}
}

var countMatchedSecrets int
for i := range secrets {
if skipProjectSecrets && secrets[i].Context == workflow.SecretProjContext {
var inRequirements bool
Expand All @@ -241,10 +238,24 @@ func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store,
if !inRequirements {
continue
}
countMatchedSecrets++
}
wnjri.Secrets = append(wnjri.Secrets, secrets[i].ToVariable())
}

if skipProjectSecrets && len(secretsReqs) > 0 {
if err := workflow.AddSpawnInfosNodeJobRun(tx, job.WorkflowNodeRunID, job.ID, []sdk.SpawnInfo{{
RemoteTime: getRemoteTime(ctx),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoManualSecretInjection.ID, Args: []interface{}{fmt.Sprintf("%d", countMatchedSecrets)}},
}}); err != nil {
return nil, sdk.WrapError(err, "cannot save spawn info job %d", job.ID)
}
}

if err := tx.Commit(); err != nil {
return nil, sdk.WithStack(err)
}

return report, nil
}

Expand Down Expand Up @@ -392,12 +403,12 @@ func (api *API) postSpawnInfosWorkflowJobHandler() service.Handler {

var s []sdk.SpawnInfo
if err := service.UnmarshalBody(r, &s); err != nil {
return sdk.WrapError(err, "Cannot unmarshal request")
return sdk.WrapError(err, "cannot unmarshal request")
}

tx, errBegin := api.mustDB().Begin()
if errBegin != nil {
return sdk.WrapError(errBegin, "Cannot start transaction")
tx, err := api.mustDB().Begin()
if err != nil {
return sdk.WrapError(err, "cannot start transaction")
}
defer tx.Rollback() // nolint

Expand Down Expand Up @@ -531,24 +542,18 @@ func (api *API) postJobResult(ctx context.Context, tx gorpmapper.SqlExecutorWith

// Add spawn info
if wr != nil {
msg := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerEnd.ID, Args: []interface{}{wr.Name}}
infos := []sdk.SpawnInfo{{
RemoteTime: res.RemoteTime,
Message: msg,
UserMessage: msg.DefaultUserMessage(),
}}
if err := workflow.AddSpawnInfosNodeJobRun(tx, job.WorkflowNodeRunID, job.ID, workflow.PrepareSpawnInfos(infos)); err != nil {
if err := workflow.AddSpawnInfosNodeJobRun(tx, job.WorkflowNodeRunID, job.ID, []sdk.SpawnInfo{{
RemoteTime: res.RemoteTime,
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerEnd.ID, Args: []interface{}{wr.Name}},
}}); err != nil {
return nil, sdk.WrapError(err, "Cannot save spawn info job %d", job.ID)
}
}
if hatch != nil && res.Status == sdk.StatusFail {
msg := sdk.SpawnMsg{ID: sdk.MsgSpawnErrorHatcheryRetryAttempt.ID, Args: []interface{}{hatch.Name, res.Reason}}
infos := []sdk.SpawnInfo{{
RemoteTime: res.RemoteTime,
Message: msg,
UserMessage: msg.DefaultUserMessage(),
}}
if err := workflow.AddSpawnInfosNodeJobRun(tx, job.WorkflowNodeRunID, job.ID, workflow.PrepareSpawnInfos(infos)); err != nil {
if err := workflow.AddSpawnInfosNodeJobRun(tx, job.WorkflowNodeRunID, job.ID, []sdk.SpawnInfo{{
RemoteTime: res.RemoteTime,
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnErrorHatcheryRetryAttempt.ID, Args: []interface{}{hatch.Name, res.Reason}},
}}); err != nil {
return nil, sdk.WrapError(err, "Cannot save spawn info job %d", job.ID)
}

Expand Down
17 changes: 4 additions & 13 deletions engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,13 +422,6 @@ func (api *API) stopWorkflowRun(ctx context.Context, p *sdk.Project, run *sdk.Wo
defer tx.Rollback() //nolint

spwnMsg := sdk.SpawnMsgNew(*sdk.MsgWorkflowNodeStop, ident.GetUsername())
stopInfos := sdk.SpawnInfo{
APITime: time.Now(),
RemoteTime: time.Now(),
Message: spwnMsg,
UserMessage: spwnMsg.DefaultUserMessage(),
}

workflow.AddWorkflowRunInfo(run, spwnMsg)

for _, wn := range run.WorkflowNodeRuns {
Expand All @@ -439,7 +432,9 @@ func (api *API) stopWorkflowRun(ctx context.Context, p *sdk.Project, run *sdk.Wo
continue
}

r1, err := workflow.StopWorkflowNodeRun(ctx, api.mustDB, api.Cache, *p, *run, wnr, stopInfos)
r1, err := workflow.StopWorkflowNodeRun(ctx, api.mustDB, api.Cache, *p, *run, wnr, sdk.SpawnInfo{
Message: spwnMsg,
})
if err != nil {
return nil, sdk.WrapError(err, "unable to stop workflow node run %d", wnr.ID)
}
Expand Down Expand Up @@ -725,12 +720,8 @@ func (api *API) stopWorkflowNodeRunHandler() service.Handler {
return sdk.WrapError(err, "unable to load workflow node run with id %d for workflow %s and run with number %d", workflowNodeRunID, workflowName, workflowRun.Number)
}

sp := sdk.SpawnMsg{ID: sdk.MsgWorkflowNodeStop.ID, Args: []interface{}{getAPIConsumer(ctx).GetUsername()}}
r1, err := workflow.StopWorkflowNodeRun(ctx, api.mustDB, api.Cache, *p, *workflowRun, *workflowNodeRun, sdk.SpawnInfo{
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
Message: sdk.SpawnMsg{ID: sdk.MsgWorkflowNodeStop.ID, Args: []interface{}{getAPIConsumer(ctx).GetUsername()}},
})
if err != nil {
return sdk.WrapError(err, "unable to stop workflow node run")
Expand Down
13 changes: 4 additions & 9 deletions engine/worker/internal/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,9 @@ func processBookedWJob(ctx context.Context, w *CurrentWorker, wjobs chan<- sdk.W
for _, r := range errRequirements {
details += fmt.Sprintf(" %s(%s)", r.Value, r.Type)
}
sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}}
infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}},
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, wjob.ID, infos); err != nil {
return sdk.WrapError(err, "Cannot record QueueJobSendSpawnInfo for job (err spawn): %d", wjob.ID)
Expand All @@ -171,12 +169,9 @@ func processBookedWJob(ctx context.Context, w *CurrentWorker, wjobs chan<- sdk.W
pluginsOK, errPlugins := checkPlugins(ctx, w, *wjob)
if !pluginsOK {
var details = errPlugins.Error()

sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}}
infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}},
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, wjob.ID, infos); err != nil {
return sdk.WrapError(err, "Cannot record QueueJobSendSpawnInfo for job (err spawn): %d", wjob.ID)
Expand Down
6 changes: 2 additions & 4 deletions engine/worker/internal/take.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,9 @@ func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) er

// Send the reason as a spawninfo
if res.Status != sdk.StatusSuccess && res.Reason != "" {
sp := sdk.SpawnMsg{ID: sdk.MsgWorkflowError.ID, Args: []interface{}{res.Reason}}
infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgWorkflowError.ID, Args: []interface{}{res.Reason}},
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, job.ID, infos); err != nil {
log.Error(ctx, "processJob> Unable to send spawn info: %v", err)
Expand Down
6 changes: 3 additions & 3 deletions sdk/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ type SpawnInfo struct {

// SpawnMsg represents a msg for spawnInfo
type SpawnMsg struct {
ID string `json:"id" db:"-"`
Args []interface{} `json:"args" db:"-"`
Type string `json:"type" db:"-"`
ID string `json:"id,omitempty" db:"-"`
Args []interface{} `json:"args,omitempty" db:"-"`
Type string `json:"type,omitempty" db:"-"`
}

func SpawnMsgNew(msg Message, args ...interface{}) SpawnMsg {
Expand Down
2 changes: 1 addition & 1 deletion sdk/hatchery/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func SendSpawnInfo(ctx context.Context, h Interface, jobID int64, spawnMsg sdk.S
if h.CDSClient() == nil || jobID == 0 {
return
}
infos := []sdk.SpawnInfo{{RemoteTime: time.Now(), Message: spawnMsg, UserMessage: spawnMsg.DefaultUserMessage()}}
infos := []sdk.SpawnInfo{{RemoteTime: time.Now(), Message: spawnMsg}}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := h.CDSClient().QueueJobSendSpawnInfo(ctx, jobID, infos); err != nil {
Expand Down
Loading

0 comments on commit 6094e20

Please sign in to comment.