Skip to content

Commit

Permalink
feat(hooks): add logs and balance in/out in status (#5131)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored Apr 15, 2020
1 parent bae5d4e commit 7c9d5fc
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 45 deletions.
25 changes: 23 additions & 2 deletions engine/hooks/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hooks
import (
"context"
"fmt"
"sync/atomic"

"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/sdk"
Expand All @@ -15,7 +16,21 @@ const (
)

type dao struct {
store cache.Store
store cache.Store
enqueuedTaskExecutions int64
dequeuedTaskExecutions int64
}

func (d *dao) enqueuedIncr() {
atomic.AddInt64(&d.enqueuedTaskExecutions, 1)
}

func (d *dao) dequeuedIncr() {
atomic.AddInt64(&d.dequeuedTaskExecutions, 1)
}

func (d *dao) TaskExecutionsBalance() (int64, int64) {
return d.enqueuedTaskExecutions, d.dequeuedTaskExecutions
}

func (d *dao) FindAllTasks(ctx context.Context) ([]sdk.Task, error) {
Expand Down Expand Up @@ -91,7 +106,13 @@ func (d *dao) EnqueueTaskExecution(ctx context.Context, r *sdk.TaskExecution) er
if err := d.store.RemoveFromQueue(schedulerQueueKey, k); err != nil {
log.Error(ctx, "error on cache RemoveFromQueue %s: %v", schedulerQueueKey, err)
}
return d.store.Enqueue(schedulerQueueKey, k)

if err := d.store.Enqueue(schedulerQueueKey, k); err != nil {
return err
}
d.enqueuedIncr()

return nil
}

func (d *dao) QueueLen() (int, error) {
Expand Down
2 changes: 1 addition & 1 deletion engine/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *Service) Serve(c context.Context) error {
}

//Init the DAO
s.Dao = dao{s.Cache}
s.Dao = dao{store: s.Cache}

// Get current maintenance state
var b bool
Expand Down
8 changes: 8 additions & 0 deletions engine/hooks/hooks_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,14 @@ func (s *Service) Status(ctx context.Context) sdk.MonitoringStatus {
}
m.Lines = append(m.Lines, sdk.MonitoringStatusLine{Component: "Queue", Value: fmt.Sprintf("%d", size), Status: status})

// hook balance in status
in, out := s.Dao.TaskExecutionsBalance()
status = sdk.MonitoringStatusOK
if float64(in) > float64(out) {
status = sdk.MonitoringStatusWarn
}
m.Lines = append(m.Lines, sdk.MonitoringStatusLine{Component: "Balance", Value: fmt.Sprintf("%d/%d", in, out), Status: status})

var nbHooksKafkaTotal int64

tasks, err := s.Dao.FindAllTasks(ctx)
Expand Down
91 changes: 49 additions & 42 deletions engine/hooks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,28 @@ func (s *Service) runScheduler(c context.Context) error {

go func() {
if err := s.dequeueTaskExecutions(ctx); err != nil {
log.Error(ctx, "Hooks> runScheduler> dequeueLongRunningTasks> %v", err)
log.Error(ctx, "runScheduler> dequeueLongRunningTasks> %v", err)
cancel()
}
}()

go func() {
if err := s.retryTaskExecutionsRoutine(ctx); err != nil {
log.Error(ctx, "Hooks> runScheduler> retryTaskExecutionsRoutine> %v", err)
log.Error(ctx, "runScheduler> retryTaskExecutionsRoutine> %v", err)
cancel()
}
}()

go func() {
if err := s.enqueueScheduledTaskExecutionsRoutine(ctx); err != nil {
log.Error(ctx, "Hooks> runScheduler> enqueueScheduledTaskExecutionsRoutine> %v", err)
log.Error(ctx, "runScheduler> enqueueScheduledTaskExecutionsRoutine> %v", err)
cancel()
}
}()

go func() {
if err := s.deleteTaskExecutionsRoutine(ctx); err != nil {
log.Error(ctx, "Hooks> runScheduler> deleteTaskExecutionsRoutine> %v", err)
log.Error(ctx, "runScheduler> deleteTaskExecutionsRoutine> %v", err)
cancel()
}
}()
Expand All @@ -58,29 +58,29 @@ func (s *Service) retryTaskExecutionsRoutine(ctx context.Context) error {
case <-tick.C:
size, err := s.Dao.QueueLen()
if err != nil {
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > Unable to get queueLen: %v", err)
log.Error(ctx, "retryTaskExecutionsRoutine > Unable to get queueLen: %v", err)
continue
}
if size > 20 {
log.Warning(ctx, "Hooks> too many tasks in scheduler for now, skipped this retry ticker. size:%d", size)
log.Warning(ctx, "too many tasks in scheduler for now, skipped this retry ticker. size:%d", size)
continue
}

if s.Maintenance {
log.Info(ctx, "Hooks> retryTaskExecutionsRoutine> Maintenance enable, wait 1 minute. Queue %d", size)
log.Info(ctx, "retryTaskExecutionsRoutine> Maintenance enable, wait 1 minute. Queue %d", size)
time.Sleep(1 * time.Minute)
continue
}

tasks, err := s.Dao.FindAllTasks(ctx)
if err != nil {
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > Unable to find all tasks: %v", err)
log.Error(ctx, "retryTaskExecutionsRoutine > Unable to find all tasks: %v", err)
continue
}
for _, t := range tasks {
execs, err := s.Dao.FindAllTaskExecutions(ctx, &t)
if err != nil {
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
log.Error(ctx, "retryTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
continue
}
for _, e := range execs {
Expand All @@ -91,37 +91,37 @@ func (s *Service) retryTaskExecutionsRoutine(ctx context.Context) error {
// old hooks
if e.ProcessingTimestamp == 0 && e.Timestamp < time.Now().Add(-2*time.Minute).UnixNano() {
if e.UUID == "" {
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Very old hook without UUID %d/%d type:%s status:%s timestamp:%d err:%v", e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError)
log.Warning(ctx, "retryTaskExecutionsRoutine > Very old hook without UUID %d/%d type:%s status:%s timestamp:%d err:%v", e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError)
continue
}
e.Status = TaskExecutionEnqueued
if err := s.Dao.SaveTaskExecution(&e); err != nil {
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine> unable to save task execution for old hook %s: %v", e.UUID, err)
log.Warning(ctx, "retryTaskExecutionsRoutine> unable to save task execution for old hook %s: %v", e.UUID, err)
continue
}
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Enqueing very old hooks %s %d/%d type:%s status:%s timestamp:%d err:%v", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError)
log.Warning(ctx, "retryTaskExecutionsRoutine > Enqueing very old hooks %s %d/%d type:%s status:%s timestamp:%d err:%v", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError)
if err := s.Dao.EnqueueTaskExecution(ctx, &e); err != nil {
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
log.Error(ctx, "retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
}
}
if e.NbErrors < s.Cfg.RetryError && e.LastError != "" {
// avoid re-enqueue if the lastError is about a git branch not found
// the branch was deleted from git repository, it will never work
if strings.Contains(e.LastError, "branchName parameter must be provided") {
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Do not re-enqueue this taskExecution with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError)
log.Warning(ctx, "retryTaskExecutionsRoutine > Do not re-enqueue this taskExecution with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError)
if err := s.Dao.DeleteTaskExecution(&e); err != nil {
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "retryTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
}
continue
}
e.Status = TaskExecutionEnqueued
if err := s.Dao.SaveTaskExecution(&e); err != nil {
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine> unable to save task execution for %s: %v", e.UUID, err)
log.Warning(ctx, "retryTaskExecutionsRoutine> unable to save task execution for %s: %v", e.UUID, err)
continue
}
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Enqueing with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError)
log.Warning(ctx, "retryTaskExecutionsRoutine > Enqueing with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError)
if err := s.Dao.EnqueueTaskExecution(ctx, &e); err != nil {
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
log.Error(ctx, "retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
}
continue
}
Expand All @@ -138,17 +138,18 @@ func (s *Service) enqueueScheduledTaskExecutionsRoutine(ctx context.Context) err
for {
select {
case <-ctx.Done():
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > exiting goroutine: %v", ctx.Err())
return ctx.Err()
case <-tick.C:
tasks, err := s.Dao.FindAllTasks(ctx)
if err != nil {
log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > Unable to find all tasks: %v", err)
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > Unable to find all tasks: %v", err)
continue
}
for _, t := range tasks {
execs, err := s.Dao.FindAllTaskExecutions(ctx, &t)
if err != nil {
log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
continue
}
alreadyEnqueued := false
Expand All @@ -157,16 +158,16 @@ func (s *Service) enqueueScheduledTaskExecutionsRoutine(ctx context.Context) err
// update status before enqueue
// this will avoid to re-enqueue the same scheduled task execution if the dequeue take more than 30s (ticker of this goroutine)
if alreadyEnqueued {
log.Info(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > task execution already enqueued for this task %s of type %s- delete it", e.UUID, e.Type)
log.Info(ctx, "enqueueScheduledTaskExecutionsRoutine > task execution already enqueued for this task %s of type %s- delete it", e.UUID, e.Type)
if err := s.Dao.DeleteTaskExecution(&e); err != nil {
log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
}
} else {
e.Status = TaskExecutionEnqueued
s.Dao.SaveTaskExecution(&e)
log.Info(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > Enqueing %s task %s:%d", e.Type, e.UUID, e.Timestamp)
log.Info(ctx, "enqueueScheduledTaskExecutionsRoutine > Enqueing %s task %s:%d", e.Type, e.UUID, e.Timestamp)
if err := s.Dao.EnqueueTaskExecution(ctx, &e); err != nil {
log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
}
// this will avoid to re-enqueue the same scheduled task execution if the dequeue take more than 30s (ticker of this goroutine)
if e.Type == TypeRepoPoller || e.Type == TypeScheduler {
Expand All @@ -193,14 +194,14 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
case <-tick.C:
tasks, err := s.Dao.FindAllTasks(ctx)
if err != nil {
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > Unable to find all tasks: %v", err)
log.Error(ctx, "deleteTaskExecutionsRoutine > Unable to find all tasks: %v", err)
continue
}
for _, t := range tasks {
taskToDelete := false
execs, err := s.Dao.FindAllTaskExecutions(ctx, &t)
if err != nil {
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
log.Error(ctx, "deleteTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
continue
}
sort.Slice(execs, func(i, j int) bool {
Expand All @@ -213,14 +214,14 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
case TypeBranchDeletion:
if e.Status == TaskExecutionDone && e.ProcessingTimestamp != 0 {
if err := s.Dao.DeleteTaskExecution(&e); err != nil {
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
}
taskToDelete = true
}
default:
if i >= s.Cfg.ExecutionHistory && e.ProcessingTimestamp != 0 {
if err := s.Dao.DeleteTaskExecution(&e); err != nil {
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
}
}
}
Expand All @@ -229,7 +230,7 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {

if taskToDelete {
if err := s.deleteTask(ctx, &t); err != nil {
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > Unable to deleteTask (%s): %v", t.UUID, err)
log.Error(ctx, "deleteTaskExecutionsRoutine > Unable to deleteTask (%s): %v", t.UUID, err)
}
}
}
Expand All @@ -241,14 +242,15 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
for {
if ctx.Err() != nil {
log.Error(ctx, "dequeueTaskExecutions> exiting go routine: %v", ctx.Err())
return ctx.Err()
}
size, err := s.Dao.QueueLen()
if err != nil {
log.Error(ctx, "Hooks> dequeueTaskExecutions > Unable to get queueLen: %v", err)
log.Error(ctx, "dequeueTaskExecutions > Unable to get queueLen: %v", err)
continue
}
log.Debug("Hooks> dequeueTaskExecutions> current queue size: %d", size)
log.Debug("dequeueTaskExecutions> current queue size: %d", size)

if s.Maintenance {
log.Info(ctx, "Maintenance enable, wait 1 minute. Queue %d", size)
Expand All @@ -259,13 +261,15 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
// Dequeuing context
var taskKey string
if ctx.Err() != nil {
log.Error(ctx, "dequeueTaskExecutions> exiting go routine: %v", err)
return ctx.Err()
}
if err := s.Cache.DequeueWithContext(ctx, schedulerQueueKey, &taskKey); err != nil {
log.Error(ctx, "Hooks> dequeueTaskExecutions> store.DequeueWithContext err: %v", err)
log.Error(ctx, "dequeueTaskExecutions> store.DequeueWithContext err: %v", err)
continue
}
log.Debug("Hooks> dequeueTaskExecutions> work on taskKey: %s", taskKey)
s.Dao.dequeuedIncr()
log.Info(ctx, "dequeueTaskExecutions> work on taskKey: %s", taskKey)

// Load the task execution
var t = sdk.TaskExecution{}
Expand All @@ -286,18 +290,18 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {

task := s.Dao.FindTask(ctx, t.UUID)
if task == nil {
log.Error(ctx, "Hooks> dequeueTaskExecutions failed: Task %s not found - deleting this task execution", t.UUID)
log.Error(ctx, "dequeueTaskExecutions failed: Task %s not found - deleting this task execution", t.UUID)
t.LastError = "Internal Error: Task not found"
t.NbErrors++
if err := s.Dao.DeleteTaskExecution(&t); err != nil {
log.Error(ctx, "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
}
continue

} else if t.NbErrors >= s.Cfg.RetryError {
log.Info(ctx, "Hooks> dequeueTaskExecutions> Deleting task execution %s cause: to many errors:%d lastError:%s", t.UUID, t.NbErrors, t.LastError)
log.Info(ctx, "dequeueTaskExecutions> Deleting task execution %s cause: to many errors:%d lastError:%s", t.UUID, t.NbErrors, t.LastError)
if err := s.Dao.DeleteTaskExecution(&t); err != nil {
log.Error(ctx, "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
}
continue

Expand All @@ -307,19 +311,19 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
saveTaskExecution = true
} else {
saveTaskExecution = true
log.Debug("Hooks> dequeueTaskExecutions> call doTask on taskKey: %s", taskKey)
log.Debug("dequeueTaskExecutions> call doTask on taskKey: %s", taskKey)
var err error
restartTask, err = s.doTask(ctx, task, &t)
if err != nil {
if strings.Contains(err.Error(), "Unsupported task type") {
// delete this task execution, as it will never work
log.Info(ctx, "Hooks> dequeueTaskExecutions> Deleting task execution %s as err:%v", t.UUID, err)
log.Info(ctx, "dequeueTaskExecutions> Deleting task execution %s as err:%v", t.UUID, err)
if err := s.Dao.DeleteTaskExecution(&t); err != nil {
log.Error(ctx, "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
}
continue
} else {
log.Error(ctx, "Hooks> dequeueTaskExecutions> %s failed err[%d]: %v", t.UUID, t.NbErrors, err)
log.Error(ctx, "dequeueTaskExecutions> %s failed err[%d]: %v", t.UUID, t.NbErrors, err)
t.LastError = err.Error()
t.NbErrors++
saveTaskExecution = true
Expand All @@ -336,7 +340,10 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {

//Start (or restart) the task
if restartTask {
_, _ = s.startTask(ctx, task)
_, err := s.startTask(ctx, task)
if err != nil {
log.Error(ctx, "dequeueTaskExecutions> unable to restart the task %+v after execution: %v", task, err)
}
}
}
}

0 comments on commit 7c9d5fc

Please sign in to comment.