Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hooks): add logs and balance in/out in status #5131

Merged
merged 2 commits into from
Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions engine/hooks/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hooks
import (
"context"
"fmt"
"sync/atomic"

"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/sdk"
Expand All @@ -15,7 +16,21 @@ const (
)

type dao struct {
store cache.Store
store cache.Store
enqueuedTaskExecutions int64
dequeuedTaskExecutions int64
}

func (d *dao) enqueuedIncr() {
atomic.AddInt64(&d.enqueuedTaskExecutions, 1)
}

func (d *dao) dequeuedIncr() {
atomic.AddInt64(&d.dequeuedTaskExecutions, 1)
}

func (d *dao) TaskExecutionsBalance() (int64, int64) {
return d.enqueuedTaskExecutions, d.dequeuedTaskExecutions
}

func (d *dao) FindAllTasks(ctx context.Context) ([]sdk.Task, error) {
Expand Down Expand Up @@ -91,7 +106,13 @@ func (d *dao) EnqueueTaskExecution(ctx context.Context, r *sdk.TaskExecution) er
if err := d.store.RemoveFromQueue(schedulerQueueKey, k); err != nil {
log.Error(ctx, "error on cache RemoveFromQueue %s: %v", schedulerQueueKey, err)
}
return d.store.Enqueue(schedulerQueueKey, k)

if err := d.store.Enqueue(schedulerQueueKey, k); err != nil {
return err
}
d.enqueuedIncr()

return nil
}

func (d *dao) QueueLen() (int, error) {
Expand Down
2 changes: 1 addition & 1 deletion engine/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *Service) Serve(c context.Context) error {
}

//Init the DAO
s.Dao = dao{s.Cache}
s.Dao = dao{store: s.Cache}

// Get current maintenance state
var b bool
Expand Down
8 changes: 8 additions & 0 deletions engine/hooks/hooks_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,14 @@ func (s *Service) Status(ctx context.Context) sdk.MonitoringStatus {
}
m.Lines = append(m.Lines, sdk.MonitoringStatusLine{Component: "Queue", Value: fmt.Sprintf("%d", size), Status: status})

// hook balance in status
in, out := s.Dao.TaskExecutionsBalance()
status = sdk.MonitoringStatusOK
if float64(in) > float64(out) {
status = sdk.MonitoringStatusWarn
}
m.Lines = append(m.Lines, sdk.MonitoringStatusLine{Component: "Balance", Value: fmt.Sprintf("%d/%d", in, out), Status: status})

var nbHooksKafkaTotal int64

tasks, err := s.Dao.FindAllTasks(ctx)
Expand Down
91 changes: 49 additions & 42 deletions engine/hooks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,28 @@ func (s *Service) runScheduler(c context.Context) error {

go func() {
if err := s.dequeueTaskExecutions(ctx); err != nil {
log.Error(ctx, "Hooks> runScheduler> dequeueLongRunningTasks> %v", err)
log.Error(ctx, "runScheduler> dequeueLongRunningTasks> %v", err)
cancel()
}
}()

go func() {
if err := s.retryTaskExecutionsRoutine(ctx); err != nil {
log.Error(ctx, "Hooks> runScheduler> retryTaskExecutionsRoutine> %v", err)
log.Error(ctx, "runScheduler> retryTaskExecutionsRoutine> %v", err)
cancel()
}
}()

go func() {
if err := s.enqueueScheduledTaskExecutionsRoutine(ctx); err != nil {
log.Error(ctx, "Hooks> runScheduler> enqueueScheduledTaskExecutionsRoutine> %v", err)
log.Error(ctx, "runScheduler> enqueueScheduledTaskExecutionsRoutine> %v", err)
cancel()
}
}()

go func() {
if err := s.deleteTaskExecutionsRoutine(ctx); err != nil {
log.Error(ctx, "Hooks> runScheduler> deleteTaskExecutionsRoutine> %v", err)
log.Error(ctx, "runScheduler> deleteTaskExecutionsRoutine> %v", err)
cancel()
}
}()
Expand All @@ -58,29 +58,29 @@ func (s *Service) retryTaskExecutionsRoutine(ctx context.Context) error {
case <-tick.C:
size, err := s.Dao.QueueLen()
if err != nil {
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > Unable to get queueLen: %v", err)
log.Error(ctx, "retryTaskExecutionsRoutine > Unable to get queueLen: %v", err)
continue
}
if size > 20 {
log.Warning(ctx, "Hooks> too many tasks in scheduler for now, skipped this retry ticker. size:%d", size)
log.Warning(ctx, "too many tasks in scheduler for now, skipped this retry ticker. size:%d", size)
continue
}

if s.Maintenance {
log.Info(ctx, "Hooks> retryTaskExecutionsRoutine> Maintenance enable, wait 1 minute. Queue %d", size)
log.Info(ctx, "retryTaskExecutionsRoutine> Maintenance enable, wait 1 minute. Queue %d", size)
time.Sleep(1 * time.Minute)
continue
}

tasks, err := s.Dao.FindAllTasks(ctx)
if err != nil {
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > Unable to find all tasks: %v", err)
log.Error(ctx, "retryTaskExecutionsRoutine > Unable to find all tasks: %v", err)
continue
}
for _, t := range tasks {
execs, err := s.Dao.FindAllTaskExecutions(ctx, &t)
if err != nil {
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
log.Error(ctx, "retryTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
continue
}
for _, e := range execs {
Expand All @@ -91,37 +91,37 @@ func (s *Service) retryTaskExecutionsRoutine(ctx context.Context) error {
// old hooks
if e.ProcessingTimestamp == 0 && e.Timestamp < time.Now().Add(-2*time.Minute).UnixNano() {
if e.UUID == "" {
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Very old hook without UUID %d/%d type:%s status:%s timestamp:%d err:%v", e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError)
log.Warning(ctx, "retryTaskExecutionsRoutine > Very old hook without UUID %d/%d type:%s status:%s timestamp:%d err:%v", e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError)
continue
}
e.Status = TaskExecutionEnqueued
if err := s.Dao.SaveTaskExecution(&e); err != nil {
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine> unable to save task execution for old hook %s: %v", e.UUID, err)
log.Warning(ctx, "retryTaskExecutionsRoutine> unable to save task execution for old hook %s: %v", e.UUID, err)
continue
}
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Enqueing very old hooks %s %d/%d type:%s status:%s timestamp:%d err:%v", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError)
log.Warning(ctx, "retryTaskExecutionsRoutine > Enqueing very old hooks %s %d/%d type:%s status:%s timestamp:%d err:%v", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError)
if err := s.Dao.EnqueueTaskExecution(ctx, &e); err != nil {
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
log.Error(ctx, "retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
}
}
if e.NbErrors < s.Cfg.RetryError && e.LastError != "" {
// avoid re-enqueue if the lastError is about a git branch not found
// the branch was deleted from git repository, it will never work
if strings.Contains(e.LastError, "branchName parameter must be provided") {
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Do not re-enqueue this taskExecution with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError)
log.Warning(ctx, "retryTaskExecutionsRoutine > Do not re-enqueue this taskExecution with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError)
if err := s.Dao.DeleteTaskExecution(&e); err != nil {
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "retryTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
}
continue
}
e.Status = TaskExecutionEnqueued
if err := s.Dao.SaveTaskExecution(&e); err != nil {
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine> unable to save task execution for %s: %v", e.UUID, err)
log.Warning(ctx, "retryTaskExecutionsRoutine> unable to save task execution for %s: %v", e.UUID, err)
continue
}
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Enqueing with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError)
log.Warning(ctx, "retryTaskExecutionsRoutine > Enqueing with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError)
if err := s.Dao.EnqueueTaskExecution(ctx, &e); err != nil {
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
log.Error(ctx, "retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
}
continue
}
Expand All @@ -138,17 +138,18 @@ func (s *Service) enqueueScheduledTaskExecutionsRoutine(ctx context.Context) err
for {
select {
case <-ctx.Done():
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > exiting goroutine: %v", ctx.Err())
return ctx.Err()
case <-tick.C:
tasks, err := s.Dao.FindAllTasks(ctx)
if err != nil {
log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > Unable to find all tasks: %v", err)
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > Unable to find all tasks: %v", err)
continue
}
for _, t := range tasks {
execs, err := s.Dao.FindAllTaskExecutions(ctx, &t)
if err != nil {
log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
continue
}
alreadyEnqueued := false
Expand All @@ -157,16 +158,16 @@ func (s *Service) enqueueScheduledTaskExecutionsRoutine(ctx context.Context) err
// update status before enqueue
// this will avoid to re-enqueue the same scheduled task execution if the dequeue take more than 30s (ticker of this goroutine)
if alreadyEnqueued {
log.Info(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > task execution already enqueued for this task %s of type %s- delete it", e.UUID, e.Type)
log.Info(ctx, "enqueueScheduledTaskExecutionsRoutine > task execution already enqueued for this task %s of type %s- delete it", e.UUID, e.Type)
if err := s.Dao.DeleteTaskExecution(&e); err != nil {
log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
}
} else {
e.Status = TaskExecutionEnqueued
s.Dao.SaveTaskExecution(&e)
log.Info(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > Enqueing %s task %s:%d", e.Type, e.UUID, e.Timestamp)
log.Info(ctx, "enqueueScheduledTaskExecutionsRoutine > Enqueing %s task %s:%d", e.Type, e.UUID, e.Timestamp)
if err := s.Dao.EnqueueTaskExecution(ctx, &e); err != nil {
log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
}
// this will avoid to re-enqueue the same scheduled task execution if the dequeue take more than 30s (ticker of this goroutine)
if e.Type == TypeRepoPoller || e.Type == TypeScheduler {
Expand All @@ -193,14 +194,14 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
case <-tick.C:
tasks, err := s.Dao.FindAllTasks(ctx)
if err != nil {
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > Unable to find all tasks: %v", err)
log.Error(ctx, "deleteTaskExecutionsRoutine > Unable to find all tasks: %v", err)
continue
}
for _, t := range tasks {
taskToDelete := false
execs, err := s.Dao.FindAllTaskExecutions(ctx, &t)
if err != nil {
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
log.Error(ctx, "deleteTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
continue
}
sort.Slice(execs, func(i, j int) bool {
Expand All @@ -213,14 +214,14 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
case TypeBranchDeletion:
if e.Status == TaskExecutionDone && e.ProcessingTimestamp != 0 {
if err := s.Dao.DeleteTaskExecution(&e); err != nil {
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
}
taskToDelete = true
}
default:
if i >= s.Cfg.ExecutionHistory && e.ProcessingTimestamp != 0 {
if err := s.Dao.DeleteTaskExecution(&e); err != nil {
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
}
}
}
Expand All @@ -229,7 +230,7 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {

if taskToDelete {
if err := s.deleteTask(ctx, &t); err != nil {
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > Unable to deleteTask (%s): %v", t.UUID, err)
log.Error(ctx, "deleteTaskExecutionsRoutine > Unable to deleteTask (%s): %v", t.UUID, err)
}
}
}
Expand All @@ -241,14 +242,15 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
for {
if ctx.Err() != nil {
log.Error(ctx, "dequeueTaskExecutions> exiting go routine: %v", ctx.Err())
return ctx.Err()
}
size, err := s.Dao.QueueLen()
if err != nil {
log.Error(ctx, "Hooks> dequeueTaskExecutions > Unable to get queueLen: %v", err)
log.Error(ctx, "dequeueTaskExecutions > Unable to get queueLen: %v", err)
continue
}
log.Debug("Hooks> dequeueTaskExecutions> current queue size: %d", size)
log.Debug("dequeueTaskExecutions> current queue size: %d", size)

if s.Maintenance {
log.Info(ctx, "Maintenance enable, wait 1 minute. Queue %d", size)
Expand All @@ -259,13 +261,15 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
// Dequeuing context
var taskKey string
if ctx.Err() != nil {
log.Error(ctx, "dequeueTaskExecutions> exiting go routine: %v", err)
return ctx.Err()
}
if err := s.Cache.DequeueWithContext(ctx, schedulerQueueKey, &taskKey); err != nil {
log.Error(ctx, "Hooks> dequeueTaskExecutions> store.DequeueWithContext err: %v", err)
log.Error(ctx, "dequeueTaskExecutions> store.DequeueWithContext err: %v", err)
continue
}
log.Debug("Hooks> dequeueTaskExecutions> work on taskKey: %s", taskKey)
s.Dao.dequeuedIncr()
log.Info(ctx, "dequeueTaskExecutions> work on taskKey: %s", taskKey)

// Load the task execution
var t = sdk.TaskExecution{}
Expand All @@ -286,18 +290,18 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {

task := s.Dao.FindTask(ctx, t.UUID)
if task == nil {
log.Error(ctx, "Hooks> dequeueTaskExecutions failed: Task %s not found - deleting this task execution", t.UUID)
log.Error(ctx, "dequeueTaskExecutions failed: Task %s not found - deleting this task execution", t.UUID)
t.LastError = "Internal Error: Task not found"
t.NbErrors++
if err := s.Dao.DeleteTaskExecution(&t); err != nil {
log.Error(ctx, "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
}
continue

} else if t.NbErrors >= s.Cfg.RetryError {
log.Info(ctx, "Hooks> dequeueTaskExecutions> Deleting task execution %s cause: to many errors:%d lastError:%s", t.UUID, t.NbErrors, t.LastError)
log.Info(ctx, "dequeueTaskExecutions> Deleting task execution %s cause: to many errors:%d lastError:%s", t.UUID, t.NbErrors, t.LastError)
if err := s.Dao.DeleteTaskExecution(&t); err != nil {
log.Error(ctx, "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
}
continue

Expand All @@ -307,19 +311,19 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
saveTaskExecution = true
} else {
saveTaskExecution = true
log.Debug("Hooks> dequeueTaskExecutions> call doTask on taskKey: %s", taskKey)
log.Debug("dequeueTaskExecutions> call doTask on taskKey: %s", taskKey)
var err error
restartTask, err = s.doTask(ctx, task, &t)
if err != nil {
if strings.Contains(err.Error(), "Unsupported task type") {
// delete this task execution, as it will never work
log.Info(ctx, "Hooks> dequeueTaskExecutions> Deleting task execution %s as err:%v", t.UUID, err)
log.Info(ctx, "dequeueTaskExecutions> Deleting task execution %s as err:%v", t.UUID, err)
if err := s.Dao.DeleteTaskExecution(&t); err != nil {
log.Error(ctx, "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
log.Error(ctx, "dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
}
continue
} else {
log.Error(ctx, "Hooks> dequeueTaskExecutions> %s failed err[%d]: %v", t.UUID, t.NbErrors, err)
log.Error(ctx, "dequeueTaskExecutions> %s failed err[%d]: %v", t.UUID, t.NbErrors, err)
t.LastError = err.Error()
t.NbErrors++
saveTaskExecution = true
Expand All @@ -336,7 +340,10 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {

//Start (or restart) the task
if restartTask {
_, _ = s.startTask(ctx, task)
_, err := s.startTask(ctx, task)
if err != nil {
log.Error(ctx, "dequeueTaskExecutions> unable to restart the task %+v after execution: %v", err)
fsamin marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}