Skip to content

Commit

Permalink
fix(api): append log when retry job (#5294)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt authored Jul 3, 2020
1 parent 5a9f2dd commit ee08e26
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 78 deletions.
2 changes: 1 addition & 1 deletion engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ func (a *API) Serve(ctx context.Context) error {
}, a.PanicDump())
sdk.GoRoutine(ctx, "workflow.Initialize",
func(ctx context.Context) {
workflow.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Cache, a.Config.URL.UI, a.Config.DefaultOS, a.Config.DefaultArch)
workflow.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Cache, a.Config.URL.UI, a.Config.DefaultOS, a.Config.DefaultArch, a.Config.Log.StepMaxSize)
}, a.PanicDump())
sdk.GoRoutine(ctx, "PushInElasticSearch",
func(ctx context.Context) {
Expand Down
8 changes: 4 additions & 4 deletions engine/api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (api *API) disableWorkerHandler() service.Handler {
}
}

if err := DisableWorker(ctx, api.mustDB(), id); err != nil {
if err := DisableWorker(ctx, api.mustDB(), id, api.Config.Log.StepMaxSize); err != nil {
cause := sdk.Cause(err)
if cause == worker.ErrNoWorker || cause == sql.ErrNoRows {
return sdk.WrapError(sdk.ErrWrongRequest, "disableWorkerHandler> worker %s does not exists", id)
Expand Down Expand Up @@ -190,7 +190,7 @@ func (api *API) postUnregisterWorkerHandler() service.Handler {
if err != nil {
return err
}
if err := DisableWorker(ctx, api.mustDB(), wk.ID); err != nil {
if err := DisableWorker(ctx, api.mustDB(), wk.ID, api.Config.Log.StepMaxSize); err != nil {
return sdk.WrapError(err, "cannot delete worker %s", wk.Name)
}
return nil
Expand Down Expand Up @@ -224,7 +224,7 @@ func (api *API) workerWaitingHandler() service.Handler {
// the package workflow

// DisableWorker disable a worker
func DisableWorker(ctx context.Context, db *gorp.DbMap, id string) error {
func DisableWorker(ctx context.Context, db *gorp.DbMap, id string, maxLogSize int64) error {
tx, errb := db.Begin()
if errb != nil {
return fmt.Errorf("DisableWorker> Cannot start tx: %v", errb)
Expand All @@ -244,7 +244,7 @@ func DisableWorker(ctx context.Context, db *gorp.DbMap, id string) error {
// We need to restart this action
wNodeJob, errL := workflow.LoadNodeJobRun(ctx, tx, nil, jobID.Int64)
if errL == nil && wNodeJob.Retry < 3 {
if err := workflow.RestartWorkflowNodeJob(context.TODO(), db, *wNodeJob); err != nil {
if err := workflow.RestartWorkflowNodeJob(context.TODO(), db, *wNodeJob, maxLogSize); err != nil {
log.Warning(ctx, "DisableWorker[%s]> Cannot restart workflow node run: %v", name, err)
} else {
log.Info(ctx, "DisableWorker[%s]> WorkflowNodeRun %d restarted after crash", name, jobID.Int64)
Expand Down
59 changes: 28 additions & 31 deletions engine/api/workflow/execute_node_job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,19 +566,21 @@ func FreeNodeJobRun(ctx context.Context, store cache.Store, id int64) error {
return sdk.WrapError(sdk.ErrJobNotBooked, "BookNodeJobRun> job %d already released", id)
}

//AddLog adds a build log
func AddLog(db gorp.SqlExecutor, job *sdk.WorkflowNodeJobRun, logs *sdk.Log, maxLogSize int64) error {
if job != nil {
logs.JobID = job.ID
logs.NodeRunID = job.WorkflowNodeRunID
}

// AppendLog adds a build log.
func AppendLog(db gorp.SqlExecutor, jobID, nodeRunID, stepOrder int64, val string, maxLogSize int64) error {
// check if log exists without loading data but with log size
exists, size, err := ExistsStepLog(db, logs.JobID, logs.StepOrder)
exists, size, err := ExistsStepLog(db, jobID, stepOrder)
if err != nil {
return sdk.WrapError(err, "cannot check if log exists")
}

logs := &sdk.Log{
JobID: jobID,
NodeRunID: nodeRunID,
StepOrder: stepOrder,
Val: val,
}

// ignore the log if max size already reached
if maxReached := truncateLogs(maxLogSize, size, logs); maxReached {
log.Debug("truncated logs")
Expand Down Expand Up @@ -628,7 +630,7 @@ func AddServiceLog(db gorp.SqlExecutor, job *sdk.WorkflowNodeJobRun, logs *sdk.S
}

// RestartWorkflowNodeJob restart all workflow node job and update logs to indicate restart
func RestartWorkflowNodeJob(ctx context.Context, db gorp.SqlExecutor, wNodeJob sdk.WorkflowNodeJobRun) error {
func RestartWorkflowNodeJob(ctx context.Context, db gorp.SqlExecutor, wNodeJob sdk.WorkflowNodeJobRun, maxLogSize int64) error {
var end func()
ctx, end = observability.Span(ctx, "workflow.RestartWorkflowNodeJob")
defer end()
Expand All @@ -638,43 +640,38 @@ func RestartWorkflowNodeJob(ctx context.Context, db gorp.SqlExecutor, wNodeJob s
if step.Status == sdk.StatusNeverBuilt || step.Status == sdk.StatusSkipped || step.Status == sdk.StatusDisabled {
continue
}
l, errL := LoadStepLogs(db, wNodeJob.ID, int64(step.StepOrder))
if errL != nil {
return sdk.WrapError(errL, "RestartWorkflowNodeJob> error while load step logs")
}
wNodeJob.Job.Reason = "Killed (Reason: Timeout)\n"
step.Status = sdk.StatusWaiting
step.Done = time.Time{}
if l != nil { // log could be nil here
l.Done = nil
logbuf := bytes.NewBufferString(l.Val)
logbuf.WriteString("\n\n\n-=-=-=-=-=- Worker timeout: job replaced in queue -=-=-=-=-=-\n\n\n")
l.Val = logbuf.String()
if err := updateLog(db, l); err != nil {
return sdk.WrapError(errL, "RestartWorkflowNodeJob> error while update step log")
}
if err := AppendLog(
db, wNodeJob.ID, wNodeJob.WorkflowNodeRunID, int64(step.StepOrder),
"\n\n\n-=-=-=-=-=- Worker timeout: job replaced in queue -=-=-=-=-=-\n\n\n",
maxLogSize,
); err != nil {
return err
}
}
nodeRun, errNR := LoadAndLockNodeRunByID(ctx, db, wNodeJob.WorkflowNodeRunID)
if errNR != nil {
return errNR

nodeRun, err := LoadAndLockNodeRunByID(ctx, db, wNodeJob.WorkflowNodeRunID)
if err != nil {
return err
}

//Synchronize struct but not in db
sync, errS := SyncNodeRunRunJob(ctx, db, nodeRun, wNodeJob)
if errS != nil {
return sdk.WrapError(errS, "RestartWorkflowNodeJob> error on sync nodeJobRun")
sync, err := SyncNodeRunRunJob(ctx, db, nodeRun, wNodeJob)
if err != nil {
return sdk.WrapError(err, "error on sync nodeJobRun")
}
if !sync {
log.Warning(ctx, "RestartWorkflowNodeJob> sync doesn't find a nodeJobRun")
log.Warning(ctx, "sync doesn't find a nodeJobRun")
}

if errU := UpdateNodeRun(db, nodeRun); errU != nil {
return sdk.WrapError(errU, "RestartWorkflowNodeJob> Cannot update node run")
if err := UpdateNodeRun(db, nodeRun); err != nil {
return sdk.WrapError(err, "cannot update node run")
}

if err := replaceWorkflowJobRunInQueue(db, wNodeJob); err != nil {
return sdk.WrapError(err, "Cannot replace workflow job in queue")
return sdk.WrapError(err, "cannot replace workflow job in queue")
}

return nil
Expand Down
24 changes: 10 additions & 14 deletions engine/api/workflow/execute_node_job_run_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func LoadLogs(db gorp.SqlExecutor, id int64) ([]sdk.Log, error) {
}

func insertLog(db gorp.SqlExecutor, logs *sdk.Log) error {
now := time.Now()
logs.Start = &now
logs.LastModified = &now

query := `
INSERT INTO workflow_node_run_job_logs (workflow_node_run_job_id, workflow_node_run_id, start, last_modified, done, step_order, value)
VALUES ($1, $2, $3, $4, $5, $6, $7)
Expand All @@ -105,26 +109,18 @@ func insertLog(db gorp.SqlExecutor, logs *sdk.Log) error {

func updateLog(db gorp.SqlExecutor, logs *sdk.Log) error {
now := time.Now()
if logs.Start == nil {
logs.Start = &now
}
if logs.LastModified == nil {
logs.LastModified = &now
}
if logs.Done == nil {
logs.Done = &now
}
logs.LastModified = &now
logs.Done = &now

query := `
UPDATE workflow_node_run_job_logs set
workflow_node_run_id = $3,
start = $4,
last_modified = $5,
done = $6,
value = value || $7
last_modified = $4,
done = $5,
value = value || $6
WHERE workflow_node_run_job_id = $1 AND step_order = $2`

if _, err := db.Exec(query, logs.JobID, logs.StepOrder, logs.NodeRunID, logs.Start, logs.LastModified, logs.Done, logs.Val); err != nil {
if _, err := db.Exec(query, logs.JobID, logs.StepOrder, logs.NodeRunID, logs.LastModified, logs.Done, logs.Val); err != nil {
return sdk.WithStack(err)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
const maxRetry = 3

// manageDeadJob restart all jobs which are building but without worker
func manageDeadJob(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store) error {
func manageDeadJob(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store, maxLogSize int64) error {
db := DBFunc()
deadJobs, err := LoadDeadNodeJobRun(ctx, db, store)
if err != nil {
Expand Down Expand Up @@ -41,7 +41,7 @@ func manageDeadJob(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.S
continue
}
} else {
if err := RestartWorkflowNodeJob(ctx, tx, deadJob); err != nil {
if err := RestartWorkflowNodeJob(ctx, tx, deadJob, maxLogSize); err != nil {
log.Warning(ctx, "manageDeadJob> Cannot restart node job run %d: %v", deadJob.ID, err)
_ = tx.Rollback()
continue
Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
var baseUIURL, defaultOS, defaultArch string

//Initialize starts goroutines for workflows
func Initialize(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store, uiURL, confDefaultOS, confDefaultArch string) {
func Initialize(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store, uiURL, confDefaultOS, confDefaultArch string, maxLogSize int64) {
baseUIURL = uiURL
defaultOS = confDefaultOS
defaultArch = confDefaultArch
Expand All @@ -31,7 +31,7 @@ func Initialize(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Stor
return
}
case <-tickHeart.C:
if err := manageDeadJob(ctx, DBFunc, store); err != nil {
if err := manageDeadJob(ctx, DBFunc, store, maxLogSize); err != nil {
log.Warning(ctx, "workflow.manageDeadJob> Error on restartDeadJob : %v", err)
}
case <-tickStop.C:
Expand Down
8 changes: 2 additions & 6 deletions engine/api/workflow/run_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,16 +634,12 @@ queueRun:
assert.Len(t, secrets, 1)

//TestAddLog
assert.NoError(t, workflow.AddLog(db, j, &sdk.Log{
Val: "This is a log",
}, workflow.DefaultMaxLogSize))
assert.NoError(t, workflow.AppendLog(db, j.ID, j.WorkflowNodeRunID, 1, "This is a log", workflow.DefaultMaxLogSize))
if t.Failed() {
tx.Rollback()
t.FailNow()
}
assert.NoError(t, workflow.AddLog(db, j, &sdk.Log{
Val: "This is another log",
}, workflow.DefaultMaxLogSize))
assert.NoError(t, workflow.AppendLog(db, j.ID, j.WorkflowNodeRunID, 1, "This is another log", workflow.DefaultMaxLogSize))
if t.Failed() {
tx.Rollback()
t.FailNow()
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func (api *API) postWorkflowJobLogsHandler() service.Handler {
return err
}

if err := workflow.AddLog(api.mustDB(), pbJob, &logs, api.Config.Log.StepMaxSize); err != nil {
if err := workflow.AppendLog(api.mustDB(), pbJob.ID, pbJob.WorkflowNodeRunID, logs.StepOrder, logs.Val, api.Config.Log.StepMaxSize); err != nil {
return err
}

Expand Down
10 changes: 2 additions & 8 deletions engine/api/workflow_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2451,16 +2451,10 @@ func initGetWorkflowNodeRunJobTest(t *testing.T, api *API, db *gorp.DbMap) (*sdk
require.NoError(t, errUJ)

// Add log
require.NoError(t, workflow.AddLog(api.mustDB(), jobRun, &sdk.Log{
StepOrder: 1,
Val: "1234567890",
}, 15))
require.NoError(t, workflow.AppendLog(api.mustDB(), jobRun.ID, jobRun.WorkflowNodeRunID, 1, "1234567890", 15))

// Add truncated log
require.NoError(t, workflow.AddLog(api.mustDB(), jobRun, &sdk.Log{
StepOrder: 1,
Val: "1234567890",
}, 15))
require.NoError(t, workflow.AppendLog(api.mustDB(), jobRun.ID, jobRun.WorkflowNodeRunID, 1, "1234567890", 15))

// Add service log
require.NoError(t, workflow.AddServiceLog(api.mustDB(), jobRun, &sdk.ServiceLog{
Expand Down
10 changes: 1 addition & 9 deletions engine/cdn/cdn_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,7 @@ func buildMessage(signature log.Signature, m hook.Message) string {
}

func (s *Service) processLog(ctx context.Context, db gorp.SqlExecutor, signature log.Signature, message string) error {
now := time.Now()
l := sdk.Log{
JobID: signature.JobID,
NodeRunID: signature.NodeRunID,
LastModified: &now,
StepOrder: signature.Worker.StepOrder,
Val: message,
}
return workflow.AddLog(db, nil, &l, s.Cfg.Log.StepMaxSize)
return workflow.AppendLog(db, signature.JobID, signature.NodeRunID, signature.Worker.StepOrder, message, s.Cfg.Log.StepMaxSize)
}

func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatcheryName string, workerName string, sig interface{}, m hook.Message) error {
Expand Down

0 comments on commit ee08e26

Please sign in to comment.