Skip to content

Commit

Permalink
fix(hatchery): worker starter cache clean (#6727)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault authored Dec 11, 2023
1 parent 4ef8fde commit 81f6c2a
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 80 deletions.
6 changes: 3 additions & 3 deletions sdk/cdsclient/client_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,20 @@ func (c *client) QueuePolling(ctx context.Context, goRoutines *sdk.GoRoutines, h
cancel()

queueFiltered := sdk.WorkflowQueue{}
var lenqueue int
for _, job := range queue {
id := strconv.FormatInt(job.ID, 10)
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(id) {
log.Debug(ctx, "skipping job %s", id)
continue
}
lenqueue = pendingWorkerCreation.SetJobInPendingWorkerCreation(id)
queueFiltered = append(queueFiltered, job)
}
log.Debug(ctx, "v1_job_queue_from_api: %v job_queue_filtered: %v len_queue: %v", len(queue), len(queueFiltered), lenqueue)
log.Debug(ctx, "v1_job_queue_from_api: %v job_queue_filtered: %v len_queue: %v", len(queue), len(queueFiltered), pendingWorkerCreation.NbJobInPendingWorkerCreation())

shrinkQueue(&queueFiltered, cap(jobs))
for _, j := range queueFiltered {
id := strconv.FormatInt(j.ID, 10)
pendingWorkerCreation.SetJobInPendingWorkerCreation(id)
telemetry.Record(ctx, hatcheryMetrics.ChanV1JobAdd, 1)
jobs <- j
}
Expand Down
5 changes: 2 additions & 3 deletions sdk/cdsclient/client_queue_V2.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,23 +167,22 @@ func (c *client) V2QueuePolling(ctx context.Context, regionName string, goRoutin
cancel()

queueFiltered := []sdk.V2WorkflowRunJob{}
var lenqueue int
for _, job := range queue {
if pendingWorkerCreation.IsJobAlreadyPendingWorkerCreation(job.ID) {
log.Debug(ctx, "skipping job %s", job.ID)
continue
}
lenqueue = pendingWorkerCreation.SetJobInPendingWorkerCreation(job.ID)
queueFiltered = append(queueFiltered, job)
}

log.Debug(ctx, "v2_job_queue_from_api: %v job_queue_filtered: %v len_queue: %v", len(queue), len(queueFiltered), lenqueue)
log.Debug(ctx, "v2_job_queue_from_api: %v job_queue_filtered: %v len_queue: %v", len(queue), len(queueFiltered), pendingWorkerCreation.NbJobInPendingWorkerCreation())

max := cap(jobs) * 2
if len(queueFiltered) < max {
max = len(queueFiltered)
}
for i := 0; i < max; i++ {
pendingWorkerCreation.SetJobInPendingWorkerCreation(queueFiltered[i].ID)
telemetry.Record(ctx, hatcheryMetrics.ChanV2JobAdd, 1)
jobs <- queueFiltered[i]
}
Expand Down
7 changes: 7 additions & 0 deletions sdk/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ func (c *HatcheryPendingWorkerCreation) SetJobInPendingWorkerCreation(id string)
return size
}

func (c *HatcheryPendingWorkerCreation) NbJobInPendingWorkerCreation() int {
c.mapSpawnJobRequestMutex.Lock()
size := len(c.mapSpawnJobRequest)
c.mapSpawnJobRequestMutex.Unlock()
return size
}

func (c *HatcheryPendingWorkerCreation) RemoveJobFromPendingWorkerCreation(id string) {
c.mapSpawnJobRequestMutex.Lock()
delete(c.mapSpawnJobRequest, id)
Expand Down
42 changes: 26 additions & 16 deletions sdk/hatchery/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func Create(ctx context.Context, h Interface) error {
if j.ID == 0 {
continue
}

currentCtx, currentCancel := context.WithTimeout(context.Background(), 10*time.Minute)
currentCtx = telemetry.ContextWithTag(currentCtx,
telemetry.TagServiceName, h.Name(),
Expand Down Expand Up @@ -196,7 +197,10 @@ func Create(ctx context.Context, h Interface) error {
)
}
}
endTrace := func(reason string) {
endTrace := func(reason string, jobID string) {
if jobID != "" {
h.GetMapPendingWorkerCreation().RemoveJobFromPendingWorkerCreation(jobID)
}
if currentCancel != nil {
currentCancel()
}
Expand All @@ -212,7 +216,7 @@ func Create(ctx context.Context, h Interface) error {
}
go func() {
<-currentCtx.Done()
endTrace(currentCtx.Err().Error())
endTrace(currentCtx.Err().Error(), "")
}()

stats.Record(currentCtx, GetMetrics().Jobs.M(1))
Expand All @@ -221,17 +225,17 @@ func Create(ctx context.Context, h Interface) error {
stats.Record(currentCtx, GetMetrics().JobsWebsocket.M(1))
}

//Check bookedBy current hatchery
// Check bookedBy current hatchery
if j.BookedBy.ID != 0 {
log.Debug(currentCtx, "hatchery> job %d is already booked", j.ID)
endTrace("booked by someone")
endTrace("booked by someone", strconv.FormatInt(j.ID, 10))
continue
}

//Check if hatchery is able to start a new worker
if !checkCapacities(currentCtx, h) {
log.Info(currentCtx, "hatchery %s is not able to provision new worker", h.Service().Name)
endTrace("no capacities")
log.Info(currentCtx, "hatchery %s is not able to provision new worker for job %v", h.Service().Name)
endTrace("no capacities", strconv.FormatInt(j.ID, 10))
continue
}

Expand Down Expand Up @@ -270,11 +274,13 @@ func Create(ctx context.Context, h Interface) error {
modelPath := strings.Split(jobModel, "/")
if len(modelPath) >= 5 {
if h.CDSClientV2() == nil {
endTrace("no clientv2", strconv.FormatInt(j.ID, 10))
continue
}
chosenModel, err = canRunJobWithModelV2(currentCtx, hWithModels, workerRequest, jobModel)
if err != nil {
log.Error(currentCtx, "%v", err)
endTrace("err on chosenModel", strconv.FormatInt(j.ID, 10))
continue
}
} else {
Expand All @@ -289,7 +295,7 @@ func Create(ctx context.Context, h Interface) error {
// No model has been found, let's send a failing result
if chosenModel == nil {
log.Debug(currentCtx, "hatchery> no model")
endTrace("no model")
endTrace("no model", strconv.FormatInt(j.ID, 10))
continue
}
canTakeJob = true
Expand All @@ -302,7 +308,7 @@ func Create(ctx context.Context, h Interface) error {

if !canTakeJob {
log.Info(currentCtx, "hatchery %s is not able to run the job %d", h.Name(), j.ID)
endTrace("cannot run job")
endTrace("cannot run job", strconv.FormatInt(j.ID, 10))
continue
}

Expand All @@ -313,6 +319,7 @@ func Create(ctx context.Context, h Interface) error {
// Interpolate model secrets
if err := ModelInterpolateSecrets(hWithModels, chosenModel); err != nil {
log.Error(currentCtx, "%v", err)
endTrace("error on secret interpolation", strconv.FormatInt(j.ID, 10))
continue
}
}
Expand All @@ -338,7 +345,7 @@ func Create(ctx context.Context, h Interface) error {
log.ErrorWithStackTrace(currentCtx, err)
}
log.Info(currentCtx, "hatchery %q failed to start worker after %d attempts", h.Configuration().Name, maxAttemptsNumberBeforeFailure)
endTrace("maximum attempts")
endTrace("maximum attempts", strconv.FormatInt(j.ID, 10))
continue
}
}
Expand Down Expand Up @@ -368,7 +375,10 @@ func handleJobV2(_ context.Context, h Interface, j sdk.V2WorkflowRunJob, cacheAt
telemetry.Tag(telemetry.TagProjectKey, j.ProjectKey),
telemetry.Tag(telemetry.TagJob, j.JobID))

endTrace := func(reason string) {
endTrace := func(reason string, jobID string) {
if jobID != "" {
h.GetMapPendingWorkerCreation().RemoveJobFromPendingWorkerCreation(jobID)
}
if cancel != nil {
cancel()
}
Expand All @@ -384,7 +394,7 @@ func handleJobV2(_ context.Context, h Interface, j sdk.V2WorkflowRunJob, cacheAt
}
go func() {
<-ctx.Done()
endTrace(ctx.Err().Error())
endTrace(ctx.Err().Error(), "")
}()

fields := log.FieldValues(ctx)
Expand All @@ -400,7 +410,7 @@ func handleJobV2(_ context.Context, h Interface, j sdk.V2WorkflowRunJob, cacheAt
//Check if hatchery is able to start a new worker
if !checkCapacities(ctx, h) {
log.Info(ctx, "hatchery %s is not able to provision new worker", h.Service().Name)
endTrace("no capacities")
endTrace("no capacities", j.JobID)
}

workerRequest := workerStarterRequest{
Expand All @@ -415,19 +425,19 @@ func handleJobV2(_ context.Context, h Interface, j sdk.V2WorkflowRunJob, cacheAt
// Check at least one worker model can match
hWithModels, isWithModels := h.(InterfaceWithModels)
if isWithModels && j.Job.RunsOn == "" {
endTrace("no model")
endTrace("no model", j.JobID)
return nil
}

if hWithModels != nil {
workerModel, err := getWorkerModelV2(ctx, hWithModels, workerRequest, j.Job.RunsOn)
if err != nil {
endTrace(fmt.Sprintf("%v", err.Error()))
endTrace(fmt.Sprintf("%v", err.Error()), j.JobID)
return err
}
workerRequest.model = *workerModel
if !h.CanSpawn(ctx, *workerModel, j.ID, nil) {
endTrace("cannot spawn")
endTrace("cannot spawn", j.JobID)
return nil
}
}
Expand All @@ -447,7 +457,7 @@ func handleJobV2(_ context.Context, h Interface, j sdk.V2WorkflowRunJob, cacheAt
return err
}
log.Info(ctx, "hatchery %q failed to start worker after %d attempts", h.Configuration().Name, maxAttemptsNumberBeforeFailure)
endTrace("maximum attempts")
endTrace("maximum attempts", j.JobID)
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/hatchery/hatchery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestCreate(t *testing.T) {

m := &sdk.HatcheryPendingWorkerCreation{}
m.Init()
mockHatchery.EXPECT().GetMapPendingWorkerCreation().Return(m).Times(3) // Thred calls: call to QueuePolling and two RemoveJobFromPendingWorkerCreation() in spawnWorkerForJob
mockHatchery.EXPECT().GetMapPendingWorkerCreation().Return(m).Times(5) // Five calls: call to QueuePolling, two RemoveJobFromPendingWorkerCreation() in spawnWorkerForJob, 2 in main.routine endTrace

// This calls are expected for each job received in the channel
mockCDSClient.EXPECT().WorkerList(gomock.Any()).Return(nil, nil).AnyTimes()
Expand Down
117 changes: 60 additions & 57 deletions sdk/hatchery/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,72 +48,75 @@ func startWorkerStarters(ctx context.Context, h Interface) chan<- workerStarterR
func workerStarter(ctx context.Context, h Interface, workerNum string, jobs <-chan workerStarterRequest) {
for j := range jobs {
telemetry.Record(ctx, GetMetrics().ChanWorkerStarterPop, 1)
// Start a worker for a job
m := j.registerWorkerModel
if m == nil {
telemetry.Record(ctx, GetMetrics().SpawningWorkers, 1)
spawned := spawnWorkerForJob(j.ctx, h, j)
if spawned {
telemetry.Record(ctx, GetMetrics().SpawnedWorkers, 1)
} else {
telemetry.Record(ctx, GetMetrics().SpawningWorkersErrors, 1)
}
j.cancel() // call to EndTrace for observability
continue
}
workerStarterRunning(ctx, h, workerNum, j)
}
}

// Start a worker for registering
log.Debug(ctx, "Spawning worker for register model %s", m.Name)
if atomic.LoadInt64(&nbWorkerToStart) > int64(h.Configuration().Provision.MaxConcurrentProvisioning) {
j.cancel() // call to EndTrace for observability
continue
func workerStarterRunning(ctx context.Context, h Interface, workerNum string, j workerStarterRequest) {
m := j.registerWorkerModel
if m == nil { // Start a worker for a job
telemetry.Record(ctx, GetMetrics().SpawningWorkers, 1)
spawned := spawnWorkerForJob(j.ctx, h, j)
if spawned {
telemetry.Record(ctx, GetMetrics().SpawnedWorkers, 1)
} else {
telemetry.Record(ctx, GetMetrics().SpawningWorkersErrors, 1)
}
j.cancel() // call to EndTrace for observability
return
}

workerName := namesgenerator.GenerateWorkerName(m.Name, "register")

atomic.AddInt64(&nbWorkerToStart, 1)
// increment nbRegisteringWorkerModels, but no decrement.
// this counter is reset with func workerRegister
atomic.AddInt64(&nbRegisteringWorkerModels, 1)
arg := SpawnArguments{
WorkerName: workerName,
Model: sdk.WorkerStarterWorkerModel{ModelV1: m},
RegisterOnly: true,
JobID: "0",
HatcheryName: h.Service().Name,
}
// Start a worker for registering
log.Debug(ctx, "Spawning worker for register model %s", m.Name)
if atomic.LoadInt64(&nbWorkerToStart) > int64(h.Configuration().Provision.MaxConcurrentProvisioning) {
j.cancel() // call to EndTrace for observability
return
}

ctx = context.WithValue(ctx, cdslog.AuthWorkerName, arg.WorkerName)
log.Info(ctx, "starting worker %q from model %q (register:%v)", arg.WorkerName, m.Name, arg.RegisterOnly)
workerName := namesgenerator.GenerateWorkerName(m.Name, "register")

// Get a JWT to authentified the worker
jwt, err := NewWorkerToken(h.Service().Name, h.GetPrivateKey(), time.Now().Add(1*time.Hour), arg)
if err != nil {
ctx = sdk.ContextWithStacktrace(ctx, err)
var spawnError = sdk.SpawnErrorForm{
Error: fmt.Sprintf("cannot spawn worker for register: %v", err),
}
if err := h.CDSClient().WorkerModelSpawnError(m.Group.Name, m.Name, spawnError); err != nil {
log.Error(ctx, "workerStarter> error on call client.WorkerModelSpawnError on worker model %s for register: %s", m.Name, err)
}
j.cancel() // call to EndTrace for observability
continue
}
arg.WorkerToken = jwt
atomic.AddInt64(&nbWorkerToStart, 1)
// increment nbRegisteringWorkerModels, but no decrement.
// this counter is reset with func workerRegister
atomic.AddInt64(&nbRegisteringWorkerModels, 1)
arg := SpawnArguments{
WorkerName: workerName,
Model: sdk.WorkerStarterWorkerModel{ModelV1: m},
RegisterOnly: true,
JobID: "0",
HatcheryName: h.Service().Name,
}

if err := h.SpawnWorker(j.ctx, arg); err != nil {
ctx = sdk.ContextWithStacktrace(ctx, err)
log.Warn(ctx, "workerRegister> cannot spawn worker for register:%s err:%v", m.Name, err)
var spawnError = sdk.SpawnErrorForm{
Error: fmt.Sprintf("cannot spawn worker for register: %v", err),
}
if err := h.CDSClient().WorkerModelSpawnError(m.Group.Name, m.Name, spawnError); err != nil {
log.Error(ctx, "workerRegister> error on call client.WorkerModelSpawnError on worker model %s for register: %s", m.Name, err)
}
ctx = context.WithValue(ctx, cdslog.AuthWorkerName, arg.WorkerName)
log.Info(ctx, "starting worker %q from model %q (register:%v)", arg.WorkerName, m.Name, arg.RegisterOnly)

// Get a JWT to authentified the worker
jwt, err := NewWorkerToken(h.Service().Name, h.GetPrivateKey(), time.Now().Add(1*time.Hour), arg)
if err != nil {
ctx = sdk.ContextWithStacktrace(ctx, err)
var spawnError = sdk.SpawnErrorForm{
Error: fmt.Sprintf("cannot spawn worker for register: %v", err),
}
if err := h.CDSClient().WorkerModelSpawnError(m.Group.Name, m.Name, spawnError); err != nil {
log.Error(ctx, "workerStarter> error on call client.WorkerModelSpawnError on worker model %s for register: %s", m.Name, err)
}
atomic.AddInt64(&nbWorkerToStart, -1)
j.cancel() // call to EndTrace for observability
return
}
arg.WorkerToken = jwt

if err := h.SpawnWorker(j.ctx, arg); err != nil {
ctx = sdk.ContextWithStacktrace(ctx, err)
log.Warn(ctx, "workerRegister> cannot spawn worker for register:%s err:%v", m.Name, err)
var spawnError = sdk.SpawnErrorForm{
Error: fmt.Sprintf("cannot spawn worker for register: %v", err),
}
if err := h.CDSClient().WorkerModelSpawnError(m.Group.Name, m.Name, spawnError); err != nil {
log.Error(ctx, "workerRegister> error on call client.WorkerModelSpawnError on worker model %s for register: %s", m.Name, err)
}
}
atomic.AddInt64(&nbWorkerToStart, -1)
j.cancel() // call to EndTrace for observability
}

func spawnWorkerForJob(ctx context.Context, h Interface, j workerStarterRequest) bool {
Expand Down

0 comments on commit 81f6c2a

Please sign in to comment.