diff --git a/engine/api/workflow/execute_node_job_run.go b/engine/api/workflow/execute_node_job_run.go index f6519054cb..875cc1b78d 100644 --- a/engine/api/workflow/execute_node_job_run.go +++ b/engine/api/workflow/execute_node_job_run.go @@ -236,9 +236,10 @@ func PrepareSpawnInfos(infos []sdk.SpawnInfo) []sdk.SpawnInfo { prepared := []sdk.SpawnInfo{} for _, info := range infos { prepared = append(prepared, sdk.SpawnInfo{ - APITime: now, - RemoteTime: info.RemoteTime, - Message: info.Message, + APITime: now, + RemoteTime: info.RemoteTime, + Message: info.Message, + UserMessage: info.Message.DefaultUserMessage(), }) } return prepared diff --git a/engine/api/workflow/execute_node_run.go b/engine/api/workflow/execute_node_run.go index ff4cd15956..468feb493e 100644 --- a/engine/api/workflow/execute_node_run.go +++ b/engine/api/workflow/execute_node_run.go @@ -529,16 +529,19 @@ jobLoop: } msg.Args = []interface{}{sdk.Cause(e).Error()} wjob.SpawnInfos = append(wjob.SpawnInfos, sdk.SpawnInfo{ - APITime: time.Now(), - Message: msg, - RemoteTime: time.Now(), + APITime: time.Now(), + Message: msg, + RemoteTime: time.Now(), + UserMessage: msg.DefaultUserMessage(), }) } } else { + sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobInQueue.ID} wjob.SpawnInfos = []sdk.SpawnInfo{{ - APITime: time.Now(), - Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobInQueue.ID}, - RemoteTime: time.Now(), + APITime: time.Now(), + Message: sp, + RemoteTime: time.Now(), + UserMessage: sp.DefaultUserMessage(), }} } diff --git a/engine/api/workflow/process.go b/engine/api/workflow/process.go index fc786a6b47..bec31069b3 100644 --- a/engine/api/workflow/process.go +++ b/engine/api/workflow/process.go @@ -77,10 +77,11 @@ func checkCondition(ctx context.Context, wr *sdk.WorkflowRun, conditions sdk.Wor func AddWorkflowRunInfo(run *sdk.WorkflowRun, infos ...sdk.SpawnMsg) { for _, i := range infos { run.Infos = append(run.Infos, sdk.WorkflowRunInfo{ - APITime: time.Now(), - Message: i, - Type: i.Type, - SubNumber: run.LastSubNumber, + APITime: time.Now(), + Message: i, + Type: i.Type, + SubNumber: run.LastSubNumber, + UserMessage: i.DefaultUserMessage(), }) } } diff --git a/engine/api/workflow/run_workflow_test.go b/engine/api/workflow/run_workflow_test.go index 43b9297468..2f2350459e 100644 --- a/engine/api/workflow/run_workflow_test.go +++ b/engine/api/workflow/run_workflow_test.go @@ -589,14 +589,14 @@ queueRun: t.FailNow() } + sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoHatcheryStarts.ID} //AddSpawnInfosNodeJobRun err := workflow.AddSpawnInfosNodeJobRun(db, j.WorkflowNodeRunID, j.ID, []sdk.SpawnInfo{ { - APITime: time.Now(), - RemoteTime: time.Now(), - Message: sdk.SpawnMsg{ - ID: sdk.MsgSpawnInfoHatcheryStarts.ID, - }, + APITime: time.Now(), + RemoteTime: time.Now(), + Message: sp, + UserMessage: sp.DefaultUserMessage(), }, }) assert.NoError(t, err) @@ -605,15 +605,15 @@ queueRun: t.FailNow() } + sp = sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTaken.ID} //TakeNodeJobRun takenJobID := j.ID takenJob, _, _ := workflow.TakeNodeJobRun(context.TODO(), db, cache, *proj, takenJobID, "model", "worker", "1", []sdk.SpawnInfo{ { - APITime: time.Now(), - RemoteTime: time.Now(), - Message: sdk.SpawnMsg{ - ID: sdk.MsgSpawnInfoJobTaken.ID, - }, + APITime: time.Now(), + RemoteTime: time.Now(), + Message: sp, + UserMessage: sp.DefaultUserMessage(), }, }, "hatchery_name") diff --git a/engine/api/workflow_queue.go b/engine/api/workflow_queue.go index 904f4ed7df..7bbd7150ce 100644 --- a/engine/api/workflow_queue.go +++ b/engine/api/workflow_queue.go @@ -123,14 +123,18 @@ func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, defer tx.Rollback() // nolint //Prepare spawn infos + m1 := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTaken.ID, Args: []interface{}{fmt.Sprintf("%d", id), wk.Name}} + m2 := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTakenWorkerVersion.ID, Args: []interface{}{wk.Name, wk.Version, wk.OS, wk.Arch}} infos := []sdk.SpawnInfo{ { - RemoteTime: getRemoteTime(ctx), - Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTaken.ID, Args: []interface{}{fmt.Sprintf("%d", id), wk.Name}}, + RemoteTime: getRemoteTime(ctx), + Message: m1, + UserMessage: m1.DefaultUserMessage(), }, { - RemoteTime: getRemoteTime(ctx), - Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoJobTakenWorkerVersion.ID, Args: []interface{}{wk.Name, wk.Version, wk.OS, wk.Arch}}, + RemoteTime: getRemoteTime(ctx), + Message: m2, + UserMessage: m2.DefaultUserMessage(), }, } @@ -444,9 +448,11 @@ func postJobResult(ctx context.Context, dbFunc func(context.Context) *gorp.DbMap observability.Tag(observability.TagWorkflowNodeRun, job.WorkflowNodeRunID), observability.Tag(observability.TagJob, job.Job.Action.Name)) + msg := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerEnd.ID, Args: []interface{}{wr.Name, res.Duration}} infos := []sdk.SpawnInfo{{ - RemoteTime: res.RemoteTime, - Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerEnd.ID, Args: []interface{}{wr.Name, res.Duration}}, + RemoteTime: res.RemoteTime, + Message: msg, + UserMessage: msg.DefaultUserMessage(), }} if err := workflow.AddSpawnInfosNodeJobRun(tx, job.WorkflowNodeRunID, job.ID, workflow.PrepareSpawnInfos(infos)); err != nil { diff --git a/engine/api/workflow_run.go b/engine/api/workflow_run.go index 5ec8404ee4..6f94cc0af9 100644 --- a/engine/api/workflow_run.go +++ b/engine/api/workflow_run.go @@ -410,9 +410,10 @@ func stopWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache spwnMsg := sdk.SpawnMsg{ID: sdk.MsgWorkflowNodeStop.ID, Args: []interface{}{ident.GetUsername()}, Type: sdk.MsgWorkflowNodeStop.Type} stopInfos := sdk.SpawnInfo{ - APITime: time.Now(), - RemoteTime: time.Now(), - Message: spwnMsg, + APITime: time.Now(), + RemoteTime: time.Now(), + Message: spwnMsg, + UserMessage: spwnMsg.DefaultUserMessage(), } workflow.AddWorkflowRunInfo(run, spwnMsg) @@ -685,10 +686,12 @@ func (api *API) stopWorkflowNodeRunHandler() service.Handler { return sdk.WrapError(err, "unable to load workflow node run with id %d for workflow %s and run with number %d", workflowNodeRunID, workflowName, workflowRun.Number) } + sp := sdk.SpawnMsg{ID: sdk.MsgWorkflowNodeStop.ID, Args: []interface{}{getAPIConsumer(ctx).GetUsername()}} report, err := workflow.StopWorkflowNodeRun(ctx, api.mustDB, api.Cache, *p, *workflowRun, *workflowNodeRun, sdk.SpawnInfo{ - APITime: time.Now(), - RemoteTime: time.Now(), - Message: sdk.SpawnMsg{ID: sdk.MsgWorkflowNodeStop.ID, Args: []interface{}{getAPIConsumer(ctx).GetUsername()}}, + APITime: time.Now(), + RemoteTime: time.Now(), + Message: sp, + UserMessage: sp.DefaultUserMessage(), }) if err != nil { return sdk.WrapError(err, "unable to stop workflow node run") @@ -1238,8 +1241,10 @@ func (api *API) getWorkflowNodeRunJobSpawnInfosHandler() service.Handler { l := r.Header.Get("Accept-Language") for ki, info := range spawnInfos { - m := sdk.NewMessage(sdk.Messages[info.Message.ID], info.Message.Args...) - spawnInfos[ki].UserMessage = m.String(l) + if _, ok := sdk.Messages[info.Message.ID]; ok { + m := sdk.NewMessage(sdk.Messages[info.Message.ID], info.Message.Args...) + spawnInfos[ki].UserMessage = m.String(l) + } } return service.WriteJSON(w, spawnInfos, http.StatusOK) } diff --git a/engine/worker/internal/start.go b/engine/worker/internal/start.go index 2cd8ff7182..b61cec9bca 100644 --- a/engine/worker/internal/start.go +++ b/engine/worker/internal/start.go @@ -152,9 +152,11 @@ func processBookedWJob(ctx context.Context, w *CurrentWorker, wjobs chan<- sdk.W for _, r := range errRequirements { details += fmt.Sprintf(" %s(%s)", r.Value, r.Type) } + sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}} infos := []sdk.SpawnInfo{{ - RemoteTime: time.Now(), - Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}}, + RemoteTime: time.Now(), + Message: sp, + UserMessage: sp.DefaultUserMessage(), }} if err := w.Client().QueueJobSendSpawnInfo(ctx, wjob.ID, infos); err != nil { return sdk.WrapError(err, "Cannot record QueueJobSendSpawnInfo for job (err spawn): %d", wjob.ID) @@ -166,9 +168,11 @@ func processBookedWJob(ctx context.Context, w *CurrentWorker, wjobs chan<- sdk.W if !pluginsOK { var details = errPlugins.Error() + sp := sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}} infos := []sdk.SpawnInfo{{ - RemoteTime: time.Now(), - Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerForJobError.ID, Args: []interface{}{w.Name(), details}}, + RemoteTime: time.Now(), + Message: sp, + UserMessage: sp.DefaultUserMessage(), }} if err := w.Client().QueueJobSendSpawnInfo(ctx, wjob.ID, infos); err != nil { return sdk.WrapError(err, "Cannot record QueueJobSendSpawnInfo for job (err spawn): %d", wjob.ID) diff --git a/engine/worker/internal/take.go b/engine/worker/internal/take.go index f48ac99f73..3ac49c398c 100644 --- a/engine/worker/internal/take.go +++ b/engine/worker/internal/take.go @@ -118,9 +118,11 @@ func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) er // Send the reason as a spawninfo if res.Status != sdk.StatusSuccess && res.Reason != "" { + sp := sdk.SpawnMsg{ID: sdk.MsgWorkflowError.ID, Args: []interface{}{res.Reason}} infos := []sdk.SpawnInfo{{ - RemoteTime: time.Now(), - Message: sdk.SpawnMsg{ID: sdk.MsgWorkflowError.ID, Args: []interface{}{res.Reason}}, + RemoteTime: time.Now(), + Message: sp, + UserMessage: sp.DefaultUserMessage(), }} if err := w.Client().QueueJobSendSpawnInfo(ctx, job.ID, infos); err != nil { log.Error(ctx, "processJob> Unable to send spawn info: %v", err) diff --git a/sdk/build.go b/sdk/build.go index 6613ee836c..26243acd71 100644 --- a/sdk/build.go +++ b/sdk/build.go @@ -1,6 +1,7 @@ package sdk import ( + "fmt" "time" ) @@ -20,6 +21,14 @@ type SpawnMsg struct { Type string `json:"type" db:"-"` } +func (s SpawnMsg) DefaultUserMessage() string { + if _, ok := Messages[s.ID]; ok { + m := Messages[s.ID] + return fmt.Sprintf(m.Format[EN], s.Args...) + } + return "" +} + // ExecutedJob represents a running job type ExecutedJob struct { Job diff --git a/sdk/hatchery/hatchery.go b/sdk/hatchery/hatchery.go index 9bef179e26..a3f0f48a1b 100644 --- a/sdk/hatchery/hatchery.go +++ b/sdk/hatchery/hatchery.go @@ -431,7 +431,7 @@ func SendSpawnInfo(ctx context.Context, h Interface, jobID int64, spawnMsg sdk.S if h.CDSClient() == nil || jobID == 0 { return } - infos := []sdk.SpawnInfo{{RemoteTime: time.Now(), Message: spawnMsg}} + infos := []sdk.SpawnInfo{{RemoteTime: time.Now(), Message: spawnMsg, UserMessage: spawnMsg.DefaultUserMessage()}} ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if err := h.CDSClient().QueueJobSendSpawnInfo(ctx, jobID, infos); err != nil { diff --git a/sdk/workflow_run.go b/sdk/workflow_run.go index 597a0cccd6..16db34642d 100644 --- a/sdk/workflow_run.go +++ b/sdk/workflow_run.go @@ -79,8 +79,10 @@ type WorkflowRunNumber struct { // Translate translates messages in WorkflowNodeRun func (r *WorkflowRun) Translate(lang string) { for ki, info := range r.Infos { - m := NewMessage(Messages[info.Message.ID], info.Message.Args...) - r.Infos[ki].UserMessage = m.String(lang) + if _, ok := Messages[info.Message.ID]; ok { + m := NewMessage(Messages[info.Message.ID], info.Message.Args...) + r.Infos[ki].UserMessage = m.String(lang) + } } } @@ -419,8 +421,10 @@ type WorkflowNodeJobRunInfo struct { // Translate translates messages in WorkflowNodeJobRun func (wnjr *WorkflowNodeJobRun) Translate(lang string) { for ki, info := range wnjr.SpawnInfos { - m := NewMessage(Messages[info.Message.ID], info.Message.Args...) - wnjr.SpawnInfos[ki].UserMessage = m.String(lang) + if _, ok := Messages[info.Message.ID]; ok { + m := NewMessage(Messages[info.Message.ID], info.Message.Args...) + wnjr.SpawnInfos[ki].UserMessage = m.String(lang) + } } }