diff --git a/engine/api/api.go b/engine/api/api.go index 7b8bebbc8c..7d1b44305a 100644 --- a/engine/api/api.go +++ b/engine/api/api.go @@ -752,7 +752,7 @@ func (a *API) Serve(ctx context.Context) error { }, a.PanicDump()) sdk.GoRoutine(ctx, "workflow.Initialize", func(ctx context.Context) { - workflow.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Cache, a.Config.URL.UI, a.Config.DefaultOS, a.Config.DefaultArch) + workflow.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Cache, a.Config.URL.UI, a.Config.DefaultOS, a.Config.DefaultArch, a.Config.Log.StepMaxSize) }, a.PanicDump()) sdk.GoRoutine(ctx, "PushInElasticSearch", func(ctx context.Context) { diff --git a/engine/api/worker.go b/engine/api/worker.go index 5f1f516f8a..1528b8cba8 100644 --- a/engine/api/worker.go +++ b/engine/api/worker.go @@ -158,7 +158,7 @@ func (api *API) disableWorkerHandler() service.Handler { } } - if err := DisableWorker(ctx, api.mustDB(), id); err != nil { + if err := DisableWorker(ctx, api.mustDB(), id, api.Config.Log.StepMaxSize); err != nil { cause := sdk.Cause(err) if cause == worker.ErrNoWorker || cause == sql.ErrNoRows { return sdk.WrapError(sdk.ErrWrongRequest, "disableWorkerHandler> worker %s does not exists", id) @@ -190,7 +190,7 @@ func (api *API) postUnregisterWorkerHandler() service.Handler { if err != nil { return err } - if err := DisableWorker(ctx, api.mustDB(), wk.ID); err != nil { + if err := DisableWorker(ctx, api.mustDB(), wk.ID, api.Config.Log.StepMaxSize); err != nil { return sdk.WrapError(err, "cannot delete worker %s", wk.Name) } return nil @@ -224,7 +224,7 @@ func (api *API) workerWaitingHandler() service.Handler { // the package workflow // DisableWorker disable a worker -func DisableWorker(ctx context.Context, db *gorp.DbMap, id string) error { +func DisableWorker(ctx context.Context, db *gorp.DbMap, id string, maxLogSize int64) error { tx, errb := db.Begin() if errb != nil { return fmt.Errorf("DisableWorker> Cannot start tx: %v", errb) @@ -244,7 +244,7 @@ func DisableWorker(ctx context.Context, db *gorp.DbMap, id string) error { // We need to restart this action wNodeJob, errL := workflow.LoadNodeJobRun(ctx, tx, nil, jobID.Int64) if errL == nil && wNodeJob.Retry < 3 { - if err := workflow.RestartWorkflowNodeJob(context.TODO(), db, *wNodeJob); err != nil { + if err := workflow.RestartWorkflowNodeJob(context.TODO(), db, *wNodeJob, maxLogSize); err != nil { log.Warning(ctx, "DisableWorker[%s]> Cannot restart workflow node run: %v", name, err) } else { log.Info(ctx, "DisableWorker[%s]> WorkflowNodeRun %d restarted after crash", name, jobID.Int64) diff --git a/engine/api/workflow/execute_node_job_run.go b/engine/api/workflow/execute_node_job_run.go index c9e8fbe82e..8756446ea0 100644 --- a/engine/api/workflow/execute_node_job_run.go +++ b/engine/api/workflow/execute_node_job_run.go @@ -566,19 +566,21 @@ func FreeNodeJobRun(ctx context.Context, store cache.Store, id int64) error { return sdk.WrapError(sdk.ErrJobNotBooked, "BookNodeJobRun> job %d already released", id) } -//AddLog adds a build log -func AddLog(db gorp.SqlExecutor, job *sdk.WorkflowNodeJobRun, logs *sdk.Log, maxLogSize int64) error { - if job != nil { - logs.JobID = job.ID - logs.NodeRunID = job.WorkflowNodeRunID - } - +// AppendLog adds a build log. +func AppendLog(db gorp.SqlExecutor, jobID, nodeRunID, stepOrder int64, val string, maxLogSize int64) error { // check if log exists without loading data but with log size - exists, size, err := ExistsStepLog(db, logs.JobID, logs.StepOrder) + exists, size, err := ExistsStepLog(db, jobID, stepOrder) if err != nil { return sdk.WrapError(err, "cannot check if log exists") } + logs := &sdk.Log{ + JobID: jobID, + NodeRunID: nodeRunID, + StepOrder: stepOrder, + Val: val, + } + // ignore the log if max size already reached if maxReached := truncateLogs(maxLogSize, size, logs); maxReached { log.Debug("truncated logs") @@ -628,7 +630,7 @@ func AddServiceLog(db gorp.SqlExecutor, job *sdk.WorkflowNodeJobRun, logs *sdk.S } // RestartWorkflowNodeJob restart all workflow node job and update logs to indicate restart -func RestartWorkflowNodeJob(ctx context.Context, db gorp.SqlExecutor, wNodeJob sdk.WorkflowNodeJobRun) error { +func RestartWorkflowNodeJob(ctx context.Context, db gorp.SqlExecutor, wNodeJob sdk.WorkflowNodeJobRun, maxLogSize int64) error { var end func() ctx, end = observability.Span(ctx, "workflow.RestartWorkflowNodeJob") defer end() @@ -638,43 +640,38 @@ func RestartWorkflowNodeJob(ctx context.Context, db gorp.SqlExecutor, wNodeJob s if step.Status == sdk.StatusNeverBuilt || step.Status == sdk.StatusSkipped || step.Status == sdk.StatusDisabled { continue } - l, errL := LoadStepLogs(db, wNodeJob.ID, int64(step.StepOrder)) - if errL != nil { - return sdk.WrapError(errL, "RestartWorkflowNodeJob> error while load step logs") - } wNodeJob.Job.Reason = "Killed (Reason: Timeout)\n" step.Status = sdk.StatusWaiting step.Done = time.Time{} - if l != nil { // log could be nil here - l.Done = nil - logbuf := bytes.NewBufferString(l.Val) - logbuf.WriteString("\n\n\n-=-=-=-=-=- Worker timeout: job replaced in queue -=-=-=-=-=-\n\n\n") - l.Val = logbuf.String() - if err := updateLog(db, l); err != nil { - return sdk.WrapError(errL, "RestartWorkflowNodeJob> error while update step log") - } + if err := AppendLog( + db, wNodeJob.ID, wNodeJob.WorkflowNodeRunID, int64(step.StepOrder), + "\n\n\n-=-=-=-=-=- Worker timeout: job replaced in queue -=-=-=-=-=-\n\n\n", + maxLogSize, + ); err != nil { + return err } } - nodeRun, errNR := LoadAndLockNodeRunByID(ctx, db, wNodeJob.WorkflowNodeRunID) - if errNR != nil { - return errNR + + nodeRun, err := LoadAndLockNodeRunByID(ctx, db, wNodeJob.WorkflowNodeRunID) + if err != nil { + return err } //Synchronize struct but not in db - sync, errS := SyncNodeRunRunJob(ctx, db, nodeRun, wNodeJob) - if errS != nil { - return sdk.WrapError(errS, "RestartWorkflowNodeJob> error on sync nodeJobRun") + sync, err := SyncNodeRunRunJob(ctx, db, nodeRun, wNodeJob) + if err != nil { + return sdk.WrapError(err, "error on sync nodeJobRun") } if !sync { - log.Warning(ctx, "RestartWorkflowNodeJob> sync doesn't find a nodeJobRun") + log.Warning(ctx, "sync doesn't find a nodeJobRun") } - if errU := UpdateNodeRun(db, nodeRun); errU != nil { - return sdk.WrapError(errU, "RestartWorkflowNodeJob> Cannot update node run") + if err := UpdateNodeRun(db, nodeRun); err != nil { + return sdk.WrapError(err, "cannot update node run") } if err := replaceWorkflowJobRunInQueue(db, wNodeJob); err != nil { - return sdk.WrapError(err, "Cannot replace workflow job in queue") + return sdk.WrapError(err, "cannot replace workflow job in queue") } return nil diff --git a/engine/api/workflow/execute_node_job_run_log.go b/engine/api/workflow/execute_node_job_run_log.go index abfbf2e052..90d81710f7 100644 --- a/engine/api/workflow/execute_node_job_run_log.go +++ b/engine/api/workflow/execute_node_job_run_log.go @@ -96,6 +96,10 @@ func LoadLogs(db gorp.SqlExecutor, id int64) ([]sdk.Log, error) { } func insertLog(db gorp.SqlExecutor, logs *sdk.Log) error { + now := time.Now() + logs.Start = &now + logs.LastModified = &now + query := ` INSERT INTO workflow_node_run_job_logs (workflow_node_run_job_id, workflow_node_run_id, start, last_modified, done, step_order, value) VALUES ($1, $2, $3, $4, $5, $6, $7) @@ -105,26 +109,18 @@ func insertLog(db gorp.SqlExecutor, logs *sdk.Log) error { func updateLog(db gorp.SqlExecutor, logs *sdk.Log) error { now := time.Now() - if logs.Start == nil { - logs.Start = &now - } - if logs.LastModified == nil { - logs.LastModified = &now - } - if logs.Done == nil { - logs.Done = &now - } + logs.LastModified = &now + logs.Done = &now query := ` UPDATE workflow_node_run_job_logs set workflow_node_run_id = $3, - start = $4, - last_modified = $5, - done = $6, - value = value || $7 + last_modified = $4, + done = $5, + value = value || $6 WHERE workflow_node_run_job_id = $1 AND step_order = $2` - if _, err := db.Exec(query, logs.JobID, logs.StepOrder, logs.NodeRunID, logs.Start, logs.LastModified, logs.Done, logs.Val); err != nil { + if _, err := db.Exec(query, logs.JobID, logs.StepOrder, logs.NodeRunID, logs.LastModified, logs.Done, logs.Val); err != nil { return sdk.WithStack(err) } return nil diff --git a/engine/api/workflow/heartbeat.go b/engine/api/workflow/heartbeat.go index b2ced02561..de55f44d26 100644 --- a/engine/api/workflow/heartbeat.go +++ b/engine/api/workflow/heartbeat.go @@ -13,7 +13,7 @@ import ( const maxRetry = 3 // manageDeadJob restart all jobs which are building but without worker -func manageDeadJob(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store) error { +func manageDeadJob(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store, maxLogSize int64) error { db := DBFunc() deadJobs, err := LoadDeadNodeJobRun(ctx, db, store) if err != nil { @@ -41,7 +41,7 @@ func manageDeadJob(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.S continue } } else { - if err := RestartWorkflowNodeJob(ctx, tx, deadJob); err != nil { + if err := RestartWorkflowNodeJob(ctx, tx, deadJob, maxLogSize); err != nil { log.Warning(ctx, "manageDeadJob> Cannot restart node job run %d: %v", deadJob.ID, err) _ = tx.Rollback() continue diff --git a/engine/api/workflow/init.go b/engine/api/workflow/init.go index 8811a76f0b..f26912bdfd 100644 --- a/engine/api/workflow/init.go +++ b/engine/api/workflow/init.go @@ -13,7 +13,7 @@ import ( var baseUIURL, defaultOS, defaultArch string //Initialize starts goroutines for workflows -func Initialize(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store, uiURL, confDefaultOS, confDefaultArch string) { +func Initialize(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store, uiURL, confDefaultOS, confDefaultArch string, maxLogSize int64) { baseUIURL = uiURL defaultOS = confDefaultOS defaultArch = confDefaultArch @@ -31,7 +31,7 @@ func Initialize(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Stor return } case <-tickHeart.C: - if err := manageDeadJob(ctx, DBFunc, store); err != nil { + if err := manageDeadJob(ctx, DBFunc, store, maxLogSize); err != nil { log.Warning(ctx, "workflow.manageDeadJob> Error on restartDeadJob : %v", err) } case <-tickStop.C: diff --git a/engine/api/workflow/run_workflow_test.go b/engine/api/workflow/run_workflow_test.go index 2f2350459e..fe6ac75105 100644 --- a/engine/api/workflow/run_workflow_test.go +++ b/engine/api/workflow/run_workflow_test.go @@ -634,16 +634,12 @@ queueRun: assert.Len(t, secrets, 1) //TestAddLog - assert.NoError(t, workflow.AddLog(db, j, &sdk.Log{ - Val: "This is a log", - }, workflow.DefaultMaxLogSize)) + assert.NoError(t, workflow.AppendLog(db, j.ID, j.WorkflowNodeRunID, 1, "This is a log", workflow.DefaultMaxLogSize)) if t.Failed() { tx.Rollback() t.FailNow() } - assert.NoError(t, workflow.AddLog(db, j, &sdk.Log{ - Val: "This is another log", - }, workflow.DefaultMaxLogSize)) + assert.NoError(t, workflow.AppendLog(db, j.ID, j.WorkflowNodeRunID, 1, "This is another log", workflow.DefaultMaxLogSize)) if t.Failed() { tx.Rollback() t.FailNow() diff --git a/engine/api/workflow_queue.go b/engine/api/workflow_queue.go index 642f388f5a..cf74fd1f34 100644 --- a/engine/api/workflow_queue.go +++ b/engine/api/workflow_queue.go @@ -567,7 +567,7 @@ func (api *API) postWorkflowJobLogsHandler() service.Handler { return err } - if err := workflow.AddLog(api.mustDB(), pbJob, &logs, api.Config.Log.StepMaxSize); err != nil { + if err := workflow.AppendLog(api.mustDB(), pbJob.ID, pbJob.WorkflowNodeRunID, logs.StepOrder, logs.Val, api.Config.Log.StepMaxSize); err != nil { return err } diff --git a/engine/api/workflow_run_test.go b/engine/api/workflow_run_test.go index b64e406f45..ee34bb10bf 100644 --- a/engine/api/workflow_run_test.go +++ b/engine/api/workflow_run_test.go @@ -2451,16 +2451,10 @@ func initGetWorkflowNodeRunJobTest(t *testing.T, api *API, db *gorp.DbMap) (*sdk require.NoError(t, errUJ) // Add log - require.NoError(t, workflow.AddLog(api.mustDB(), jobRun, &sdk.Log{ - StepOrder: 1, - Val: "1234567890", - }, 15)) + require.NoError(t, workflow.AppendLog(api.mustDB(), jobRun.ID, jobRun.WorkflowNodeRunID, 1, "1234567890", 15)) // Add truncated log - require.NoError(t, workflow.AddLog(api.mustDB(), jobRun, &sdk.Log{ - StepOrder: 1, - Val: "1234567890", - }, 15)) + require.NoError(t, workflow.AppendLog(api.mustDB(), jobRun.ID, jobRun.WorkflowNodeRunID, 1, "1234567890", 15)) // Add service log require.NoError(t, workflow.AddServiceLog(api.mustDB(), jobRun, &sdk.ServiceLog{ diff --git a/engine/cdn/cdn_log.go b/engine/cdn/cdn_log.go index d079f627df..7fbd9e8004 100644 --- a/engine/cdn/cdn_log.go +++ b/engine/cdn/cdn_log.go @@ -238,15 +238,7 @@ func buildMessage(signature log.Signature, m hook.Message) string { } func (s *Service) processLog(ctx context.Context, db gorp.SqlExecutor, signature log.Signature, message string) error { - now := time.Now() - l := sdk.Log{ - JobID: signature.JobID, - NodeRunID: signature.NodeRunID, - LastModified: &now, - StepOrder: signature.Worker.StepOrder, - Val: message, - } - return workflow.AddLog(db, nil, &l, s.Cfg.Log.StepMaxSize) + return workflow.AppendLog(db, signature.JobID, signature.NodeRunID, signature.Worker.StepOrder, message, s.Cfg.Log.StepMaxSize) } func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatcheryName string, workerName string, sig interface{}, m hook.Message) error {