Skip to content

Commit

Permalink
fix(api): avoid panic on err msg deleted (#5245)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault authored Jun 12, 2020
1 parent 9631d15 commit 0d46f8a
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 48 deletions.
7 changes: 4 additions & 3 deletions engine/api/workflow/execute_node_job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,10 @@ func PrepareSpawnInfos(infos []sdk.SpawnInfo) []sdk.SpawnInfo {
prepared := []sdk.SpawnInfo{}
for _, info := range infos {
prepared = append(prepared, sdk.SpawnInfo{
APITime: now,
RemoteTime: info.RemoteTime,
Message: info.Message,
APITime: now,
RemoteTime: info.RemoteTime,
Message: info.Message,
UserMessage: info.Message.DefaultUserMessage(),
})
}
return prepared
Expand Down
15 changes: 9 additions & 6 deletions engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,16 +529,19 @@ jobLoop:
}
msg.Args = []interface{}{sdk.Cause(e).Error()}
wjob.SpawnInfos = append(wjob.SpawnInfos, sdk.SpawnInfo{
APITime: time.Now(),
Message: msg,
RemoteTime: time.Now(),
APITime: time.Now(),
Message: msg,
RemoteTime: time.Now(),
UserMessage: msg.DefaultUserMessage(),
})
}
} else {
sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobInQueue.ID}
wjob.SpawnInfos = []sdk.SpawnInfo{{
APITime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobInQueue.ID},
RemoteTime: time.Now(),
APITime: time.Now(),
Message: sp,
RemoteTime: time.Now(),
UserMessage: sp.DefaultUserMessage(),
}}
}

Expand Down
9 changes: 5 additions & 4 deletions engine/api/workflow/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ func checkCondition(ctx context.Context, wr *sdk.WorkflowRun, conditions sdk.Wor
func AddWorkflowRunInfo(run *sdk.WorkflowRun, infos ...sdk.SpawnMsg) {
for _, i := range infos {
run.Infos = append(run.Infos, sdk.WorkflowRunInfo{
APITime: time.Now(),
Message: i,
Type: i.Type,
SubNumber: run.LastSubNumber,
APITime: time.Now(),
Message: i,
Type: i.Type,
SubNumber: run.LastSubNumber,
UserMessage: i.DefaultUserMessage(),
})
}
}
Expand Down
20 changes: 10 additions & 10 deletions engine/api/workflow/run_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,14 +589,14 @@ queueRun:
t.FailNow()
}

sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoHatcheryStarts.ID}
//AddSpawnInfosNodeJobRun
err := workflow.AddSpawnInfosNodeJobRun(db, j.WorkflowNodeRunID, j.ID, []sdk.SpawnInfo{
{
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{
ID: sdk.MsgSpawnInfoHatcheryStarts.ID,
},
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
},
})
assert.NoError(t, err)
Expand All @@ -605,15 +605,15 @@ queueRun:
t.FailNow()
}

sp = sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTaken.ID}
//TakeNodeJobRun
takenJobID := j.ID
takenJob, _, _ := workflow.TakeNodeJobRun(context.TODO(), db, cache, *proj, takenJobID, "model", "worker", "1", []sdk.SpawnInfo{
{
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{
ID: sdk.MsgSpawnInfoJobTaken.ID,
},
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
},
}, "hatchery_name")

Expand Down
18 changes: 12 additions & 6 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,18 @@ func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store,
defer tx.Rollback() // nolint

//Prepare spawn infos
m1 := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTaken.ID, Args: []interface{}{fmt.Sprintf("%d", id), wk.Name}}
m2 := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTakenWorkerVersion.ID, Args: []interface{}{wk.Name, wk.Version, wk.OS, wk.Arch}}
infos := []sdk.SpawnInfo{
{
RemoteTime: getRemoteTime(ctx),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTaken.ID, Args: []interface{}{fmt.Sprintf("%d", id), wk.Name}},
RemoteTime: getRemoteTime(ctx),
Message: m1,
UserMessage: m1.DefaultUserMessage(),
},
{
RemoteTime: getRemoteTime(ctx),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTakenWorkerVersion.ID, Args: []interface{}{wk.Name, wk.Version, wk.OS, wk.Arch}},
RemoteTime: getRemoteTime(ctx),
Message: m2,
UserMessage: m2.DefaultUserMessage(),
},
}

Expand Down Expand Up @@ -444,9 +448,11 @@ func postJobResult(ctx context.Context, dbFunc func(context.Context) *gorp.DbMap
observability.Tag(observability.TagWorkflowNodeRun, job.WorkflowNodeRunID),
observability.Tag(observability.TagJob, job.Job.Action.Name))

msg := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerEnd.ID, Args: []interface{}{wr.Name, res.Duration}}
infos := []sdk.SpawnInfo{{
RemoteTime: res.RemoteTime,
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerEnd.ID, Args: []interface{}{wr.Name, res.Duration}},
RemoteTime: res.RemoteTime,
Message: msg,
UserMessage: msg.DefaultUserMessage(),
}}

if err := workflow.AddSpawnInfosNodeJobRun(tx, job.WorkflowNodeRunID, job.ID, workflow.PrepareSpawnInfos(infos)); err != nil {
Expand Down
21 changes: 13 additions & 8 deletions engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,10 @@ func stopWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache
spwnMsg := sdk.SpawnMsg{ID: sdk.MsgWorkflowNodeStop.ID, Args: []interface{}{ident.GetUsername()}, Type: sdk.MsgWorkflowNodeStop.Type}

stopInfos := sdk.SpawnInfo{
APITime: time.Now(),
RemoteTime: time.Now(),
Message: spwnMsg,
APITime: time.Now(),
RemoteTime: time.Now(),
Message: spwnMsg,
UserMessage: spwnMsg.DefaultUserMessage(),
}

workflow.AddWorkflowRunInfo(run, spwnMsg)
Expand Down Expand Up @@ -685,10 +686,12 @@ func (api *API) stopWorkflowNodeRunHandler() service.Handler {
return sdk.WrapError(err, "unable to load workflow node run with id %d for workflow %s and run with number %d", workflowNodeRunID, workflowName, workflowRun.Number)
}

sp := sdk.SpawnMsg{ID: sdk.MsgWorkflowNodeStop.ID, Args: []interface{}{getAPIConsumer(ctx).GetUsername()}}
report, err := workflow.StopWorkflowNodeRun(ctx, api.mustDB, api.Cache, *p, *workflowRun, *workflowNodeRun, sdk.SpawnInfo{
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgWorkflowNodeStop.ID, Args: []interface{}{getAPIConsumer(ctx).GetUsername()}},
APITime: time.Now(),
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
})
if err != nil {
return sdk.WrapError(err, "unable to stop workflow node run")
Expand Down Expand Up @@ -1238,8 +1241,10 @@ func (api *API) getWorkflowNodeRunJobSpawnInfosHandler() service.Handler {

l := r.Header.Get("Accept-Language")
for ki, info := range spawnInfos {
m := sdk.NewMessage(sdk.Messages[info.Message.ID], info.Message.Args...)
spawnInfos[ki].UserMessage = m.String(l)
if _, ok := sdk.Messages[info.Message.ID]; ok {
m := sdk.NewMessage(sdk.Messages[info.Message.ID], info.Message.Args...)
spawnInfos[ki].UserMessage = m.String(l)
}
}
return service.WriteJSON(w, spawnInfos, http.StatusOK)
}
Expand Down
12 changes: 8 additions & 4 deletions engine/worker/internal/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ func processBookedWJob(ctx context.Context, w *CurrentWorker, wjobs chan<- sdk.W
for _, r := range errRequirements {
details += fmt.Sprintf(" %s(%s)", r.Value, r.Type)
}
sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}}
infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}},
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, wjob.ID, infos); err != nil {
return sdk.WrapError(err, "Cannot record QueueJobSendSpawnInfo for job (err spawn): %d", wjob.ID)
Expand All @@ -166,9 +168,11 @@ func processBookedWJob(ctx context.Context, w *CurrentWorker, wjobs chan<- sdk.W
if !pluginsOK {
var details = errPlugins.Error()

sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}}
infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}},
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, wjob.ID, infos); err != nil {
return sdk.WrapError(err, "Cannot record QueueJobSendSpawnInfo for job (err spawn): %d", wjob.ID)
Expand Down
6 changes: 4 additions & 2 deletions engine/worker/internal/take.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,11 @@ func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) er

// Send the reason as a spawninfo
if res.Status != sdk.StatusSuccess && res.Reason != "" {
sp := sdk.SpawnMsg{ID: sdk.MsgWorkflowError.ID, Args: []interface{}{res.Reason}}
infos := []sdk.SpawnInfo{{
RemoteTime: time.Now(),
Message: sdk.SpawnMsg{ID: sdk.MsgWorkflowError.ID, Args: []interface{}{res.Reason}},
RemoteTime: time.Now(),
Message: sp,
UserMessage: sp.DefaultUserMessage(),
}}
if err := w.Client().QueueJobSendSpawnInfo(ctx, job.ID, infos); err != nil {
log.Error(ctx, "processJob> Unable to send spawn info: %v", err)
Expand Down
9 changes: 9 additions & 0 deletions sdk/build.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sdk

import (
"fmt"
"time"
)

Expand All @@ -20,6 +21,14 @@ type SpawnMsg struct {
Type string `json:"type" db:"-"`
}

func (s SpawnMsg) DefaultUserMessage() string {
if _, ok := Messages[s.ID]; ok {
m := Messages[s.ID]
return fmt.Sprintf(m.Format[EN], s.Args...)
}
return ""
}

// ExecutedJob represents a running job
type ExecutedJob struct {
Job
Expand Down
2 changes: 1 addition & 1 deletion sdk/hatchery/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func SendSpawnInfo(ctx context.Context, h Interface, jobID int64, spawnMsg sdk.S
if h.CDSClient() == nil || jobID == 0 {
return
}
infos := []sdk.SpawnInfo{{RemoteTime: time.Now(), Message: spawnMsg}}
infos := []sdk.SpawnInfo{{RemoteTime: time.Now(), Message: spawnMsg, UserMessage: spawnMsg.DefaultUserMessage()}}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := h.CDSClient().QueueJobSendSpawnInfo(ctx, jobID, infos); err != nil {
Expand Down
12 changes: 8 additions & 4 deletions sdk/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ type WorkflowRunNumber struct {
// Translate translates messages in WorkflowNodeRun
func (r *WorkflowRun) Translate(lang string) {
for ki, info := range r.Infos {
m := NewMessage(Messages[info.Message.ID], info.Message.Args...)
r.Infos[ki].UserMessage = m.String(lang)
if _, ok := Messages[info.Message.ID]; ok {
m := NewMessage(Messages[info.Message.ID], info.Message.Args...)
r.Infos[ki].UserMessage = m.String(lang)
}
}
}

Expand Down Expand Up @@ -419,8 +421,10 @@ type WorkflowNodeJobRunInfo struct {
// Translate translates messages in WorkflowNodeJobRun
func (wnjr *WorkflowNodeJobRun) Translate(lang string) {
for ki, info := range wnjr.SpawnInfos {
m := NewMessage(Messages[info.Message.ID], info.Message.Args...)
wnjr.SpawnInfos[ki].UserMessage = m.String(lang)
if _, ok := Messages[info.Message.ID]; ok {
m := NewMessage(Messages[info.Message.ID], info.Message.Args...)
wnjr.SpawnInfos[ki].UserMessage = m.String(lang)
}
}
}

Expand Down

0 comments on commit 0d46f8a

Please sign in to comment.