Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): append log when retry job #5294

Merged
merged 2 commits into from
Jul 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ func (a *API) Serve(ctx context.Context) error {
}, a.PanicDump())
sdk.GoRoutine(ctx, "workflow.Initialize",
func(ctx context.Context) {
workflow.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Cache, a.Config.URL.UI, a.Config.DefaultOS, a.Config.DefaultArch)
workflow.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Cache, a.Config.URL.UI, a.Config.DefaultOS, a.Config.DefaultArch, a.Config.Log.StepMaxSize)
}, a.PanicDump())
sdk.GoRoutine(ctx, "PushInElasticSearch",
func(ctx context.Context) {
Expand Down
8 changes: 4 additions & 4 deletions engine/api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (api *API) disableWorkerHandler() service.Handler {
}
}

if err := DisableWorker(ctx, api.mustDB(), id); err != nil {
if err := DisableWorker(ctx, api.mustDB(), id, api.Config.Log.StepMaxSize); err != nil {
cause := sdk.Cause(err)
if cause == worker.ErrNoWorker || cause == sql.ErrNoRows {
return sdk.WrapError(sdk.ErrWrongRequest, "disableWorkerHandler> worker %s does not exists", id)
Expand Down Expand Up @@ -190,7 +190,7 @@ func (api *API) postUnregisterWorkerHandler() service.Handler {
if err != nil {
return err
}
if err := DisableWorker(ctx, api.mustDB(), wk.ID); err != nil {
if err := DisableWorker(ctx, api.mustDB(), wk.ID, api.Config.Log.StepMaxSize); err != nil {
return sdk.WrapError(err, "cannot delete worker %s", wk.Name)
}
return nil
Expand Down Expand Up @@ -224,7 +224,7 @@ func (api *API) workerWaitingHandler() service.Handler {
// the package workflow

// DisableWorker disable a worker
func DisableWorker(ctx context.Context, db *gorp.DbMap, id string) error {
func DisableWorker(ctx context.Context, db *gorp.DbMap, id string, maxLogSize int64) error {
tx, errb := db.Begin()
if errb != nil {
return fmt.Errorf("DisableWorker> Cannot start tx: %v", errb)
Expand All @@ -244,7 +244,7 @@ func DisableWorker(ctx context.Context, db *gorp.DbMap, id string) error {
// We need to restart this action
wNodeJob, errL := workflow.LoadNodeJobRun(ctx, tx, nil, jobID.Int64)
if errL == nil && wNodeJob.Retry < 3 {
if err := workflow.RestartWorkflowNodeJob(context.TODO(), db, *wNodeJob); err != nil {
if err := workflow.RestartWorkflowNodeJob(context.TODO(), db, *wNodeJob, maxLogSize); err != nil {
log.Warning(ctx, "DisableWorker[%s]> Cannot restart workflow node run: %v", name, err)
} else {
log.Info(ctx, "DisableWorker[%s]> WorkflowNodeRun %d restarted after crash", name, jobID.Int64)
Expand Down
59 changes: 28 additions & 31 deletions engine/api/workflow/execute_node_job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,19 +566,21 @@ func FreeNodeJobRun(ctx context.Context, store cache.Store, id int64) error {
return sdk.WrapError(sdk.ErrJobNotBooked, "BookNodeJobRun> job %d already released", id)
}

//AddLog adds a build log
func AddLog(db gorp.SqlExecutor, job *sdk.WorkflowNodeJobRun, logs *sdk.Log, maxLogSize int64) error {
if job != nil {
logs.JobID = job.ID
logs.NodeRunID = job.WorkflowNodeRunID
}

// AppendLog adds a build log.
func AppendLog(db gorp.SqlExecutor, jobID, nodeRunID, stepOrder int64, val string, maxLogSize int64) error {
// check if log exists without loading data but with log size
exists, size, err := ExistsStepLog(db, logs.JobID, logs.StepOrder)
exists, size, err := ExistsStepLog(db, jobID, stepOrder)
if err != nil {
return sdk.WrapError(err, "cannot check if log exists")
}

logs := &sdk.Log{
JobID: jobID,
NodeRunID: nodeRunID,
StepOrder: stepOrder,
Val: val,
}

// ignore the log if max size already reached
if maxReached := truncateLogs(maxLogSize, size, logs); maxReached {
log.Debug("truncated logs")
Expand Down Expand Up @@ -628,7 +630,7 @@ func AddServiceLog(db gorp.SqlExecutor, job *sdk.WorkflowNodeJobRun, logs *sdk.S
}

// RestartWorkflowNodeJob restart all workflow node job and update logs to indicate restart
func RestartWorkflowNodeJob(ctx context.Context, db gorp.SqlExecutor, wNodeJob sdk.WorkflowNodeJobRun) error {
func RestartWorkflowNodeJob(ctx context.Context, db gorp.SqlExecutor, wNodeJob sdk.WorkflowNodeJobRun, maxLogSize int64) error {
var end func()
ctx, end = observability.Span(ctx, "workflow.RestartWorkflowNodeJob")
defer end()
Expand All @@ -638,43 +640,38 @@ func RestartWorkflowNodeJob(ctx context.Context, db gorp.SqlExecutor, wNodeJob s
if step.Status == sdk.StatusNeverBuilt || step.Status == sdk.StatusSkipped || step.Status == sdk.StatusDisabled {
continue
}
l, errL := LoadStepLogs(db, wNodeJob.ID, int64(step.StepOrder))
if errL != nil {
return sdk.WrapError(errL, "RestartWorkflowNodeJob> error while load step logs")
}
wNodeJob.Job.Reason = "Killed (Reason: Timeout)\n"
step.Status = sdk.StatusWaiting
step.Done = time.Time{}
if l != nil { // log could be nil here
l.Done = nil
logbuf := bytes.NewBufferString(l.Val)
logbuf.WriteString("\n\n\n-=-=-=-=-=- Worker timeout: job replaced in queue -=-=-=-=-=-\n\n\n")
l.Val = logbuf.String()
if err := updateLog(db, l); err != nil {
return sdk.WrapError(errL, "RestartWorkflowNodeJob> error while update step log")
}
if err := AppendLog(
db, wNodeJob.ID, wNodeJob.WorkflowNodeRunID, int64(step.StepOrder),
"\n\n\n-=-=-=-=-=- Worker timeout: job replaced in queue -=-=-=-=-=-\n\n\n",
maxLogSize,
); err != nil {
return err
}
}
nodeRun, errNR := LoadAndLockNodeRunByID(ctx, db, wNodeJob.WorkflowNodeRunID)
if errNR != nil {
return errNR

nodeRun, err := LoadAndLockNodeRunByID(ctx, db, wNodeJob.WorkflowNodeRunID)
if err != nil {
return err
}

//Synchronize struct but not in db
sync, errS := SyncNodeRunRunJob(ctx, db, nodeRun, wNodeJob)
if errS != nil {
return sdk.WrapError(errS, "RestartWorkflowNodeJob> error on sync nodeJobRun")
sync, err := SyncNodeRunRunJob(ctx, db, nodeRun, wNodeJob)
if err != nil {
return sdk.WrapError(err, "error on sync nodeJobRun")
}
if !sync {
log.Warning(ctx, "RestartWorkflowNodeJob> sync doesn't find a nodeJobRun")
log.Warning(ctx, "sync doesn't find a nodeJobRun")
}

if errU := UpdateNodeRun(db, nodeRun); errU != nil {
return sdk.WrapError(errU, "RestartWorkflowNodeJob> Cannot update node run")
if err := UpdateNodeRun(db, nodeRun); err != nil {
return sdk.WrapError(err, "cannot update node run")
}

if err := replaceWorkflowJobRunInQueue(db, wNodeJob); err != nil {
return sdk.WrapError(err, "Cannot replace workflow job in queue")
return sdk.WrapError(err, "cannot replace workflow job in queue")
}

return nil
Expand Down
24 changes: 10 additions & 14 deletions engine/api/workflow/execute_node_job_run_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func LoadLogs(db gorp.SqlExecutor, id int64) ([]sdk.Log, error) {
}

func insertLog(db gorp.SqlExecutor, logs *sdk.Log) error {
now := time.Now()
logs.Start = &now
logs.LastModified = &now

query := `
INSERT INTO workflow_node_run_job_logs (workflow_node_run_job_id, workflow_node_run_id, start, last_modified, done, step_order, value)
VALUES ($1, $2, $3, $4, $5, $6, $7)
Expand All @@ -105,26 +109,18 @@ func insertLog(db gorp.SqlExecutor, logs *sdk.Log) error {

func updateLog(db gorp.SqlExecutor, logs *sdk.Log) error {
now := time.Now()
if logs.Start == nil {
logs.Start = &now
}
if logs.LastModified == nil {
logs.LastModified = &now
}
if logs.Done == nil {
logs.Done = &now
}
logs.LastModified = &now
logs.Done = &now

query := `
UPDATE workflow_node_run_job_logs set
workflow_node_run_id = $3,
start = $4,
last_modified = $5,
done = $6,
value = value || $7
last_modified = $4,
done = $5,
value = value || $6
WHERE workflow_node_run_job_id = $1 AND step_order = $2`

if _, err := db.Exec(query, logs.JobID, logs.StepOrder, logs.NodeRunID, logs.Start, logs.LastModified, logs.Done, logs.Val); err != nil {
if _, err := db.Exec(query, logs.JobID, logs.StepOrder, logs.NodeRunID, logs.LastModified, logs.Done, logs.Val); err != nil {
return sdk.WithStack(err)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
const maxRetry = 3

// manageDeadJob restart all jobs which are building but without worker
func manageDeadJob(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store) error {
func manageDeadJob(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store, maxLogSize int64) error {
db := DBFunc()
deadJobs, err := LoadDeadNodeJobRun(ctx, db, store)
if err != nil {
Expand Down Expand Up @@ -41,7 +41,7 @@ func manageDeadJob(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.S
continue
}
} else {
if err := RestartWorkflowNodeJob(ctx, tx, deadJob); err != nil {
if err := RestartWorkflowNodeJob(ctx, tx, deadJob, maxLogSize); err != nil {
log.Warning(ctx, "manageDeadJob> Cannot restart node job run %d: %v", deadJob.ID, err)
_ = tx.Rollback()
continue
Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
var baseUIURL, defaultOS, defaultArch string

//Initialize starts goroutines for workflows
func Initialize(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store, uiURL, confDefaultOS, confDefaultArch string) {
func Initialize(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store, uiURL, confDefaultOS, confDefaultArch string, maxLogSize int64) {
baseUIURL = uiURL
defaultOS = confDefaultOS
defaultArch = confDefaultArch
Expand All @@ -31,7 +31,7 @@ func Initialize(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Stor
return
}
case <-tickHeart.C:
if err := manageDeadJob(ctx, DBFunc, store); err != nil {
if err := manageDeadJob(ctx, DBFunc, store, maxLogSize); err != nil {
log.Warning(ctx, "workflow.manageDeadJob> Error on restartDeadJob : %v", err)
}
case <-tickStop.C:
Expand Down
8 changes: 2 additions & 6 deletions engine/api/workflow/run_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,16 +634,12 @@ queueRun:
assert.Len(t, secrets, 1)

//TestAddLog
assert.NoError(t, workflow.AddLog(db, j, &sdk.Log{
Val: "This is a log",
}, workflow.DefaultMaxLogSize))
assert.NoError(t, workflow.AppendLog(db, j.ID, j.WorkflowNodeRunID, 1, "This is a log", workflow.DefaultMaxLogSize))
if t.Failed() {
tx.Rollback()
t.FailNow()
}
assert.NoError(t, workflow.AddLog(db, j, &sdk.Log{
Val: "This is another log",
}, workflow.DefaultMaxLogSize))
assert.NoError(t, workflow.AppendLog(db, j.ID, j.WorkflowNodeRunID, 1, "This is another log", workflow.DefaultMaxLogSize))
if t.Failed() {
tx.Rollback()
t.FailNow()
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func (api *API) postWorkflowJobLogsHandler() service.Handler {
return err
}

if err := workflow.AddLog(api.mustDB(), pbJob, &logs, api.Config.Log.StepMaxSize); err != nil {
if err := workflow.AppendLog(api.mustDB(), pbJob.ID, pbJob.WorkflowNodeRunID, logs.StepOrder, logs.Val, api.Config.Log.StepMaxSize); err != nil {
return err
}

Expand Down
10 changes: 2 additions & 8 deletions engine/api/workflow_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2451,16 +2451,10 @@ func initGetWorkflowNodeRunJobTest(t *testing.T, api *API, db *gorp.DbMap) (*sdk
require.NoError(t, errUJ)

// Add log
require.NoError(t, workflow.AddLog(api.mustDB(), jobRun, &sdk.Log{
StepOrder: 1,
Val: "1234567890",
}, 15))
require.NoError(t, workflow.AppendLog(api.mustDB(), jobRun.ID, jobRun.WorkflowNodeRunID, 1, "1234567890", 15))

// Add truncated log
require.NoError(t, workflow.AddLog(api.mustDB(), jobRun, &sdk.Log{
StepOrder: 1,
Val: "1234567890",
}, 15))
require.NoError(t, workflow.AppendLog(api.mustDB(), jobRun.ID, jobRun.WorkflowNodeRunID, 1, "1234567890", 15))

// Add service log
require.NoError(t, workflow.AddServiceLog(api.mustDB(), jobRun, &sdk.ServiceLog{
Expand Down
10 changes: 1 addition & 9 deletions engine/cdn/cdn_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,7 @@ func buildMessage(signature log.Signature, m hook.Message) string {
}

func (s *Service) processLog(ctx context.Context, db gorp.SqlExecutor, signature log.Signature, message string) error {
now := time.Now()
l := sdk.Log{
JobID: signature.JobID,
NodeRunID: signature.NodeRunID,
LastModified: &now,
StepOrder: signature.Worker.StepOrder,
Val: message,
}
return workflow.AddLog(db, nil, &l, s.Cfg.Log.StepMaxSize)
return workflow.AppendLog(db, signature.JobID, signature.NodeRunID, signature.Worker.StepOrder, message, s.Cfg.Log.StepMaxSize)
}

func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatcheryName string, workerName string, sig interface{}, m hook.Message) error {
Expand Down