diff --git a/engine/hooks/dao.go b/engine/hooks/dao.go index 4cecd87814..4dc626512d 100644 --- a/engine/hooks/dao.go +++ b/engine/hooks/dao.go @@ -3,6 +3,7 @@ package hooks import ( "context" "fmt" + "sync/atomic" "github.com/ovh/cds/engine/api/cache" "github.com/ovh/cds/sdk" @@ -15,7 +16,21 @@ const ( ) type dao struct { - store cache.Store + store cache.Store + enqueuedTaskExecutions int64 + dequeuedTaskExecutions int64 +} + +func (d *dao) enqueuedIncr() { + atomic.AddInt64(&d.enqueuedTaskExecutions, 1) +} + +func (d *dao) dequeuedIncr() { + atomic.AddInt64(&d.dequeuedTaskExecutions, 1) +} + +func (d *dao) TaskExecutionsBalance() (int64, int64) { + return d.enqueuedTaskExecutions, d.dequeuedTaskExecutions } func (d *dao) FindAllTasks(ctx context.Context) ([]sdk.Task, error) { @@ -91,7 +106,13 @@ func (d *dao) EnqueueTaskExecution(ctx context.Context, r *sdk.TaskExecution) er if err := d.store.RemoveFromQueue(schedulerQueueKey, k); err != nil { log.Error(ctx, "error on cache RemoveFromQueue %s: %v", schedulerQueueKey, err) } - return d.store.Enqueue(schedulerQueueKey, k) + + if err := d.store.Enqueue(schedulerQueueKey, k); err != nil { + return err + } + d.enqueuedIncr() + + return nil } func (d *dao) QueueLen() (int, error) { diff --git a/engine/hooks/hooks.go b/engine/hooks/hooks.go index 08e44f7e06..8eb8175fd1 100644 --- a/engine/hooks/hooks.go +++ b/engine/hooks/hooks.go @@ -87,7 +87,7 @@ func (s *Service) Serve(c context.Context) error { } //Init the DAO - s.Dao = dao{s.Cache} + s.Dao = dao{store: s.Cache} // Get current maintenance state var b bool diff --git a/engine/hooks/hooks_handlers.go b/engine/hooks/hooks_handlers.go index e9de233ebd..26867c64ce 100644 --- a/engine/hooks/hooks_handlers.go +++ b/engine/hooks/hooks_handlers.go @@ -517,6 +517,14 @@ func (s *Service) Status(ctx context.Context) sdk.MonitoringStatus { } m.Lines = append(m.Lines, sdk.MonitoringStatusLine{Component: "Queue", Value: fmt.Sprintf("%d", size), Status: status}) + // hook balance in status + in, out := s.Dao.TaskExecutionsBalance() + status = sdk.MonitoringStatusOK + if float64(in) > float64(out) { + status = sdk.MonitoringStatusWarn + } + m.Lines = append(m.Lines, sdk.MonitoringStatusLine{Component: "Balance", Value: fmt.Sprintf("%d/%d", in, out), Status: status}) + var nbHooksKafkaTotal int64 tasks, err := s.Dao.FindAllTasks(ctx) diff --git a/engine/hooks/scheduler.go b/engine/hooks/scheduler.go index ba4187e2ec..f88c81b302 100644 --- a/engine/hooks/scheduler.go +++ b/engine/hooks/scheduler.go @@ -17,28 +17,28 @@ func (s *Service) runScheduler(c context.Context) error { go func() { if err := s.dequeueTaskExecutions(ctx); err != nil { - log.Error(ctx, "Hooks> runScheduler> dequeueLongRunningTasks> %v", err) + log.Error(ctx, "runScheduler> dequeueLongRunningTasks> %v", err) cancel() } }() go func() { if err := s.retryTaskExecutionsRoutine(ctx); err != nil { - log.Error(ctx, "Hooks> runScheduler> retryTaskExecutionsRoutine> %v", err) + log.Error(ctx, "runScheduler> retryTaskExecutionsRoutine> %v", err) cancel() } }() go func() { if err := s.enqueueScheduledTaskExecutionsRoutine(ctx); err != nil { - log.Error(ctx, "Hooks> runScheduler> enqueueScheduledTaskExecutionsRoutine> %v", err) + log.Error(ctx, "runScheduler> enqueueScheduledTaskExecutionsRoutine> %v", err) cancel() } }() go func() { if err := s.deleteTaskExecutionsRoutine(ctx); err != nil { - log.Error(ctx, "Hooks> runScheduler> deleteTaskExecutionsRoutine> %v", err) + log.Error(ctx, "runScheduler> deleteTaskExecutionsRoutine> %v", err) cancel() } }() @@ -58,29 +58,29 @@ func (s *Service) retryTaskExecutionsRoutine(ctx context.Context) error { case <-tick.C: size, err := s.Dao.QueueLen() if err != nil { - log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > Unable to get queueLen: %v", err) + log.Error(ctx, "retryTaskExecutionsRoutine > Unable to get queueLen: %v", err) continue } if size > 20 { - log.Warning(ctx, "Hooks> too many tasks in scheduler for now, skipped this retry ticker. size:%d", size) + log.Warning(ctx, "too many tasks in scheduler for now, skipped this retry ticker. size:%d", size) continue } if s.Maintenance { - log.Info(ctx, "Hooks> retryTaskExecutionsRoutine> Maintenance enable, wait 1 minute. Queue %d", size) + log.Info(ctx, "retryTaskExecutionsRoutine> Maintenance enable, wait 1 minute. Queue %d", size) time.Sleep(1 * time.Minute) continue } tasks, err := s.Dao.FindAllTasks(ctx) if err != nil { - log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > Unable to find all tasks: %v", err) + log.Error(ctx, "retryTaskExecutionsRoutine > Unable to find all tasks: %v", err) continue } for _, t := range tasks { execs, err := s.Dao.FindAllTaskExecutions(ctx, &t) if err != nil { - log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err) + log.Error(ctx, "retryTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err) continue } for _, e := range execs { @@ -91,37 +91,37 @@ func (s *Service) retryTaskExecutionsRoutine(ctx context.Context) error { // old hooks if e.ProcessingTimestamp == 0 && e.Timestamp < time.Now().Add(-2*time.Minute).UnixNano() { if e.UUID == "" { - log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Very old hook without UUID %d/%d type:%s status:%s timestamp:%d err:%v", e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError) + log.Warning(ctx, "retryTaskExecutionsRoutine > Very old hook without UUID %d/%d type:%s status:%s timestamp:%d err:%v", e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError) continue } e.Status = TaskExecutionEnqueued if err := s.Dao.SaveTaskExecution(&e); err != nil { - log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine> unable to save task execution for old hook %s: %v", e.UUID, err) + log.Warning(ctx, "retryTaskExecutionsRoutine> unable to save task execution for old hook %s: %v", e.UUID, err) continue } - log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Enqueing very old hooks %s %d/%d type:%s status:%s timestamp:%d err:%v", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError) + log.Warning(ctx, "retryTaskExecutionsRoutine > Enqueing very old hooks %s %d/%d type:%s status:%s timestamp:%d err:%v", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError) if err := s.Dao.EnqueueTaskExecution(ctx, &e); err != nil { - log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err) + log.Error(ctx, "retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err) } } if e.NbErrors < s.Cfg.RetryError && e.LastError != "" { // avoid re-enqueue if the lastError is about a git branch not found // the branch was deleted from git repository, it will never work if strings.Contains(e.LastError, "branchName parameter must be provided") { - log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Do not re-enqueue this taskExecution with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError) + log.Warning(ctx, "retryTaskExecutionsRoutine > Do not re-enqueue this taskExecution with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError) if err := s.Dao.DeleteTaskExecution(&e); err != nil { - log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err) + log.Error(ctx, "retryTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err) } continue } e.Status = TaskExecutionEnqueued if err := s.Dao.SaveTaskExecution(&e); err != nil { - log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine> unable to save task execution for %s: %v", e.UUID, err) + log.Warning(ctx, "retryTaskExecutionsRoutine> unable to save task execution for %s: %v", e.UUID, err) continue } - log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Enqueing with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError) + log.Warning(ctx, "retryTaskExecutionsRoutine > Enqueing with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError) if err := s.Dao.EnqueueTaskExecution(ctx, &e); err != nil { - log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err) + log.Error(ctx, "retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err) } continue } @@ -138,17 +138,18 @@ func (s *Service) enqueueScheduledTaskExecutionsRoutine(ctx context.Context) err for { select { case <-ctx.Done(): + log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > exiting goroutine: %v", ctx.Err()) return ctx.Err() case <-tick.C: tasks, err := s.Dao.FindAllTasks(ctx) if err != nil { - log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > Unable to find all tasks: %v", err) + log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > Unable to find all tasks: %v", err) continue } for _, t := range tasks { execs, err := s.Dao.FindAllTaskExecutions(ctx, &t) if err != nil { - log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err) + log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err) continue } alreadyEnqueued := false @@ -157,16 +158,16 @@ func (s *Service) enqueueScheduledTaskExecutionsRoutine(ctx context.Context) err // update status before enqueue // this will avoid to re-enqueue the same scheduled task execution if the dequeue take more than 30s (ticker of this goroutine) if alreadyEnqueued { - log.Info(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > task execution already enqueued for this task %s of type %s- delete it", e.UUID, e.Type) + log.Info(ctx, "enqueueScheduledTaskExecutionsRoutine > task execution already enqueued for this task %s of type %s- delete it", e.UUID, e.Type) if err := s.Dao.DeleteTaskExecution(&e); err != nil { - log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err) + log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err) } } else { e.Status = TaskExecutionEnqueued s.Dao.SaveTaskExecution(&e) - log.Info(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > Enqueing %s task %s:%d", e.Type, e.UUID, e.Timestamp) + log.Info(ctx, "enqueueScheduledTaskExecutionsRoutine > Enqueing %s task %s:%d", e.Type, e.UUID, e.Timestamp) if err := s.Dao.EnqueueTaskExecution(ctx, &e); err != nil { - log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err) + log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err) } // this will avoid to re-enqueue the same scheduled task execution if the dequeue take more than 30s (ticker of this goroutine) if e.Type == TypeRepoPoller || e.Type == TypeScheduler { @@ -193,14 +194,14 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error { case <-tick.C: tasks, err := s.Dao.FindAllTasks(ctx) if err != nil { - log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > Unable to find all tasks: %v", err) + log.Error(ctx, "deleteTaskExecutionsRoutine > Unable to find all tasks: %v", err) continue } for _, t := range tasks { taskToDelete := false execs, err := s.Dao.FindAllTaskExecutions(ctx, &t) if err != nil { - log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err) + log.Error(ctx, "deleteTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err) continue } sort.Slice(execs, func(i, j int) bool { @@ -213,14 +214,14 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error { case TypeBranchDeletion: if e.Status == TaskExecutionDone && e.ProcessingTimestamp != 0 { if err := s.Dao.DeleteTaskExecution(&e); err != nil { - log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err) + log.Error(ctx, "deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err) } taskToDelete = true } default: if i >= s.Cfg.ExecutionHistory && e.ProcessingTimestamp != 0 { if err := s.Dao.DeleteTaskExecution(&e); err != nil { - log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err) + log.Error(ctx, "deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err) } } } @@ -229,7 +230,7 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error { if taskToDelete { if err := s.deleteTask(ctx, &t); err != nil { - log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > Unable to deleteTask (%s): %v", t.UUID, err) + log.Error(ctx, "deleteTaskExecutionsRoutine > Unable to deleteTask (%s): %v", t.UUID, err) } } } @@ -241,14 +242,15 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error { func (s *Service) dequeueTaskExecutions(ctx context.Context) error { for { if ctx.Err() != nil { + log.Error(ctx, "dequeueTaskExecutions> exiting go routine: %v", ctx.Err()) return ctx.Err() } size, err := s.Dao.QueueLen() if err != nil { - log.Error(ctx, "Hooks> dequeueTaskExecutions > Unable to get queueLen: %v", err) + log.Error(ctx, "dequeueTaskExecutions > Unable to get queueLen: %v", err) continue } - log.Debug("Hooks> dequeueTaskExecutions> current queue size: %d", size) + log.Debug("dequeueTaskExecutions> current queue size: %d", size) if s.Maintenance { log.Info(ctx, "Maintenance enable, wait 1 minute. Queue %d", size) @@ -259,13 +261,15 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error { // Dequeuing context var taskKey string if ctx.Err() != nil { + log.Error(ctx, "dequeueTaskExecutions> exiting go routine: %v", err) return ctx.Err() } if err := s.Cache.DequeueWithContext(ctx, schedulerQueueKey, &taskKey); err != nil { - log.Error(ctx, "Hooks> dequeueTaskExecutions> store.DequeueWithContext err: %v", err) + log.Error(ctx, "dequeueTaskExecutions> store.DequeueWithContext err: %v", err) continue } - log.Debug("Hooks> dequeueTaskExecutions> work on taskKey: %s", taskKey) + s.Dao.dequeuedIncr() + log.Info(ctx, "dequeueTaskExecutions> work on taskKey: %s", taskKey) // Load the task execution var t = sdk.TaskExecution{} @@ -286,18 +290,18 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error { task := s.Dao.FindTask(ctx, t.UUID) if task == nil { - log.Error(ctx, "Hooks> dequeueTaskExecutions failed: Task %s not found - deleting this task execution", t.UUID) + log.Error(ctx, "dequeueTaskExecutions failed: Task %s not found - deleting this task execution", t.UUID) t.LastError = "Internal Error: Task not found" t.NbErrors++ if err := s.Dao.DeleteTaskExecution(&t); err != nil { - log.Error(ctx, "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v", err) + log.Error(ctx, "dequeueTaskExecutions > error on DeleteTaskExecution: %v", err) } continue } else if t.NbErrors >= s.Cfg.RetryError { - log.Info(ctx, "Hooks> dequeueTaskExecutions> Deleting task execution %s cause: to many errors:%d lastError:%s", t.UUID, t.NbErrors, t.LastError) + log.Info(ctx, "dequeueTaskExecutions> Deleting task execution %s cause: to many errors:%d lastError:%s", t.UUID, t.NbErrors, t.LastError) if err := s.Dao.DeleteTaskExecution(&t); err != nil { - log.Error(ctx, "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v", err) + log.Error(ctx, "dequeueTaskExecutions > error on DeleteTaskExecution: %v", err) } continue @@ -307,19 +311,19 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error { saveTaskExecution = true } else { saveTaskExecution = true - log.Debug("Hooks> dequeueTaskExecutions> call doTask on taskKey: %s", taskKey) + log.Debug("dequeueTaskExecutions> call doTask on taskKey: %s", taskKey) var err error restartTask, err = s.doTask(ctx, task, &t) if err != nil { if strings.Contains(err.Error(), "Unsupported task type") { // delete this task execution, as it will never work - log.Info(ctx, "Hooks> dequeueTaskExecutions> Deleting task execution %s as err:%v", t.UUID, err) + log.Info(ctx, "dequeueTaskExecutions> Deleting task execution %s as err:%v", t.UUID, err) if err := s.Dao.DeleteTaskExecution(&t); err != nil { - log.Error(ctx, "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v", err) + log.Error(ctx, "dequeueTaskExecutions > error on DeleteTaskExecution: %v", err) } continue } else { - log.Error(ctx, "Hooks> dequeueTaskExecutions> %s failed err[%d]: %v", t.UUID, t.NbErrors, err) + log.Error(ctx, "dequeueTaskExecutions> %s failed err[%d]: %v", t.UUID, t.NbErrors, err) t.LastError = err.Error() t.NbErrors++ saveTaskExecution = true @@ -336,7 +340,10 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error { //Start (or restart) the task if restartTask { - _, _ = s.startTask(ctx, task) + _, err := s.startTask(ctx, task) + if err != nil { + log.Error(ctx, "dequeueTaskExecutions> unable to restart the task %+v after execution: %v", task, err) + } } } }