Skip to content

Commit

Permalink
fix(hatchery): priority for models registration (#6414)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt authored Jan 11, 2023
1 parent 5292853 commit 8a4dc7c
Showing 1 changed file with 40 additions and 14 deletions.
54 changes: 40 additions & 14 deletions sdk/hatchery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hatchery

import (
"context"
"sort"
"strings"
"sync/atomic"

Expand Down Expand Up @@ -32,14 +33,39 @@ func workerRegister(ctx context.Context, h InterfaceWithModels, startWorkerChan
log.Error(ctx, "worker pool error: %v", err)
}

ms := make([]sdk.Model, len(models))
copy(ms, models)

// Sort to first register models with NeedRegistration
sort.Slice(ms, func(i, j int) bool {
modelI, modelJ := ms[i], ms[j]
var modelIPriority, modelJPriority int
if modelI.CheckRegistration {
modelIPriority++
}
if modelI.NeedRegistration {
modelIPriority += 2
}
if modelJ.CheckRegistration {
modelJPriority++
}
if modelJ.NeedRegistration {
modelJPriority += 2
}
if modelIPriority == modelJPriority {
return modelI.Name < modelJ.Name
}
return modelIPriority > modelJPriority
})

atomic.StoreInt64(&nbRegisteringWorkerModels, int64(len(currentRegistering)))
loopModels:
for k := range models {
if models[k].Type != h.ModelType() {
for k := range ms {
if ms[k].Type != h.ModelType() {
continue
}
if h.CanSpawn(ctx, &models[k], 0, nil) && (h.NeedRegistration(ctx, &models[k]) || models[k].CheckRegistration) {
log.Debug(ctx, "model %q need to register", models[k].Path())
if h.CanSpawn(ctx, &ms[k], 0, nil) && (h.NeedRegistration(ctx, &ms[k]) || ms[k].CheckRegistration) {
log.Debug(ctx, "model %q need to register", ms[k].Path())
} else {
continue
}
Expand All @@ -60,40 +86,40 @@ loopModels:

// Check if there is a pending registering worker
for _, w := range currentRegistering {
if strings.Contains(w.Name, models[k].Name) {
log.Info(ctx, "model %q is already registering (%s)", models[k].Name, w.Name)
if strings.Contains(w.Name, ms[k].Name) {
log.Info(ctx, "model %q is already registering (%s)", ms[k].Name, w.Name)
continue loopModels
}
}

// if current hatchery is in same group than worker model -> do not avoid spawn, even if worker model is in error
if models[k].NbSpawnErr > 5 {
log.Warn(ctx, "Too many errors on spawn with model %s, please check this worker model", models[k].Name)
if ms[k].NbSpawnErr > 5 {
log.Warn(ctx, "Too many errors on spawn with model %s, please check this worker model", ms[k].Name)
continue
}

if err := h.CDSClient().WorkerModelBook(models[k].Group.Name, models[k].Name); err != nil {
if err := h.CDSClient().WorkerModelBook(ms[k].Group.Name, ms[k].Name); err != nil {
ctx := log.ContextWithStackTrace(ctx, err)
if sdk.ErrorIs(err, sdk.ErrWorkerModelAlreadyBooked) {
log.Info(ctx, "worker model already booked. model %s with id %d: %v", models[k].Path(), models[k].ID, err)
log.Info(ctx, "worker model already booked. model %s with id %d: %v", ms[k].Path(), ms[k].ID, err)
} else {
log.Error(ctx, "cannot book model %s with id %d: %v", models[k].Path(), models[k].ID, err)
log.Error(ctx, "cannot book model %s with id %d: %v", ms[k].Path(), ms[k].ID, err)
}
continue
}

log.Info(ctx, "model %q (%d) has been booked and will be spawned for registration", models[k].Name, models[k].ID)

// Interpolate model secrets
if err := ModelInterpolateSecrets(h, &models[k]); err != nil {
if err := ModelInterpolateSecrets(h, &ms[k]); err != nil {
ctx := log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "cannot interpolate secrets for model %s: %v", models[k].Path(), err)
log.Error(ctx, "cannot interpolate secrets for model %s: %v", ms[k].Path(), err)
continue
}

//Ask for the creation
startWorkerChan <- workerStarterRequest{
registerWorkerModel: &models[k],
registerWorkerModel: &ms[k],
ctx: ctx,
cancel: end,
}
Expand Down

0 comments on commit 8a4dc7c

Please sign in to comment.