Skip to content

Commit

Permalink
feat: improve hatchery vsphere provisionning (#6592)
Browse files Browse the repository at this point in the history
richardlt authored Jul 26, 2023
1 parent 467ed5c commit 47f7b72
Showing 7 changed files with 117 additions and 53 deletions.
6 changes: 3 additions & 3 deletions engine/hatchery/vsphere/client.go
Original file line number Diff line number Diff line change
@@ -236,15 +236,15 @@ func (h *HatcheryVSphere) prepareCloneSpec(ctx context.Context, vm *object.Virtu

if len(h.availableIPAddresses) > 0 {
var err error
ip, err := h.findAvailableIP(ctx, workerName)
ip, err := h.findAvailableIP(ctx)
if err != nil {
return nil, sdk.WithStack(err)
return nil, err
}
log.Debug(ctx, "Found %s as available IP", ip)
// Once we found an IP Address, we have to reserve this IP in local memory
// because the IP address won't be used directly on the server
if err := h.reserveIPAddress(ctx, ip); err != nil {
return nil, sdk.WithStack(err)
return nil, err
}

customSpec.NicSettingMap = []types.CustomizationAdapterMapping{{
92 changes: 72 additions & 20 deletions engine/hatchery/vsphere/hatchery.go
Original file line number Diff line number Diff line change
@@ -3,8 +3,8 @@ package vsphere
import (
"context"
"fmt"
"github.com/ovh/cds/sdk/telemetry"
"strings"
"sync"
"time"

jwt "github.com/golang-jwt/jwt"
@@ -19,6 +19,7 @@ import (
"github.com/ovh/cds/sdk/cdsclient"
"github.com/ovh/cds/sdk/hatchery"
"github.com/ovh/cds/sdk/namesgenerator"
"github.com/ovh/cds/sdk/telemetry"
)

// New instanciates a new Hatchery vsphere
@@ -198,6 +199,13 @@ func (h *HatcheryVSphere) CanSpawn(ctx context.Context, model sdk.WorkerStarterW
}
}

// Check if there is one ip available
if len(h.availableIPAddresses) > 0 {
if _, err := h.findAvailableIP(ctx); err != nil {
return false
}
}

return true
}

@@ -274,6 +282,9 @@ func (h *HatcheryVSphere) WorkersStarted(ctx context.Context) ([]string, error)
srvs := h.getVirtualMachines(ctx)
res := make([]string, 0, len(srvs))
for _, s := range srvs {
if strings.HasPrefix(s.Name, "provision-") {
continue
}
res = append(res, s.Name)
}
return res, nil
@@ -328,6 +339,9 @@ func (h *HatcheryVSphere) killAwolServers(ctx context.Context) {
if annot == nil {
continue
}
if annot.HatcheryName != h.Name() {
continue
}

var isMarkToDelete = h.isMarkedToDelete(s)
var isPoweredOff = s.Summary.Runtime.PowerState != types.VirtualMachinePowerStatePoweredOn
@@ -380,39 +394,36 @@ func (h *HatcheryVSphere) killAwolServers(ctx context.Context) {
}

func (h *HatcheryVSphere) provisioning(ctx context.Context) {
if len(h.Config.WorkerProvisioning) == 0 {
log.Debug(ctx, "provisioning is disabled")
return
}

if len(h.cacheProvisioning.pending) > 0 {
log.Debug(ctx, "provisioning is still on going")
return
}

h.cacheProvisioning.mu.Lock()

// Count exiting provisionned machine for each model
var mapAlreadyProvisionned = make(map[string]int)
machines := h.getVirtualMachines(ctx)
for _, machine := range machines {
if !strings.HasPrefix(machine.Name, "provision-") {
continue
}
annot := getVirtualMachineCDSAnnotation(ctx, machine)
if annot == nil {
continue
}
// Provisionned machines are powered off
if annot.Provisioning && machine.Runtime.PowerState != types.VirtualMachinePowerStatePoweredOn {
if annot.HatcheryName != h.Name() {
continue
}
if annot.Provisioning {
mapAlreadyProvisionned[annot.WorkerModelPath] = mapAlreadyProvisionned[annot.WorkerModelPath] + 1
}
}

h.cacheProvisioning.mu.Unlock()

// Count provision to create for each model
mapToProvision := make(map[string]int)
mapModels := make(map[string]sdk.Model)
for i := range h.Config.WorkerProvisioning {
modelPath := h.Config.WorkerProvisioning[i].ModelPath
number := h.Config.WorkerProvisioning[i].Number

if number == 0 {
continue // If provisioning is disabled
if modelPath == "" || number == 0 {
continue
}

tuple := strings.Split(modelPath, "/")
@@ -435,21 +446,62 @@ func (h *HatcheryVSphere) provisioning(ctx context.Context) {

log.Info(ctx, "model %q provisioning: %d/%d", modelPath, mapAlreadyProvisionned[modelPath], number)

for i := 0; i < int(number)-mapAlreadyProvisionned[modelPath]; i++ {
mapModels[modelPath] = model
count := int(number) - mapAlreadyProvisionned[modelPath]
if count > 0 {
mapToProvision[modelPath] = count
}
}

// Distribute models in provision queue
countModelToProvision := len(mapToProvision)
if countModelToProvision == 0 {
return
}
poolSize := h.Config.WorkerProvisioningPoolSize
if poolSize == 0 {
poolSize = 1
}
var provisionQueue []string
for len(mapToProvision) > 0 {
for i := range h.Config.WorkerProvisioning {
modelPath := h.Config.WorkerProvisioning[i].ModelPath
count, ok := mapToProvision[modelPath]
if !ok {
continue
}
if count == 0 {
delete(mapToProvision, modelPath)
continue
}
provisionQueue = append(provisionQueue, modelPath)
mapToProvision[modelPath] = mapToProvision[modelPath] - 1
}
}

// Provision workers
wg := new(sync.WaitGroup)
for i := 0; i < len(provisionQueue) && i < poolSize; i++ {
modelPath := provisionQueue[i]
wg.Add(1)
go func() {
defer wg.Done()

workerName := namesgenerator.GenerateWorkerName(modelPath, "provision")

h.cacheProvisioning.mu.Lock()
h.cacheProvisioning.pending = append(h.cacheProvisioning.pending, workerName)
h.cacheProvisioning.mu.Unlock()

if err := h.ProvisionWorker(ctx, model, workerName); err != nil {
if err := h.ProvisionWorker(ctx, mapModels[modelPath], workerName); err != nil {
ctx = log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "unable to provision model %q: %v", modelPath, err)
}

h.cacheProvisioning.mu.Lock()
h.cacheProvisioning.pending = sdk.DeleteFromArray(h.cacheProvisioning.pending, workerName)
h.cacheProvisioning.mu.Unlock()
}
}()
}
wg.Wait()
}
5 changes: 3 additions & 2 deletions engine/hatchery/vsphere/hatchery_test.go
Original file line number Diff line number Diff line change
@@ -4,11 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
sdkhatchery "github.com/ovh/cds/sdk/hatchery"
"strings"
"testing"
"time"

sdkhatchery "github.com/ovh/cds/sdk/hatchery"

"github.com/golang/mock/gomock"
"github.com/ovh/cds/engine/hatchery"
"github.com/ovh/cds/engine/service"
@@ -485,7 +486,7 @@ func TestHatcheryVSphere_provisioning_do_nothing(t *testing.T) {
},
}, {
ManagedEntity: mo.ManagedEntity{
Name: "provisionned_worker",
Name: "provision-worker",
},
Summary: types.VirtualMachineSummary{
Config: types.VirtualMachineConfigSummary{
35 changes: 22 additions & 13 deletions engine/hatchery/vsphere/init.go
Original file line number Diff line number Diff line change
@@ -36,21 +36,30 @@ func (h *HatcheryVSphere) InitHatchery(ctx context.Context) error {

killAwolServersTick := time.NewTicker(2 * time.Minute)
killDisabledWorkersTick := time.NewTicker(2 * time.Minute)
provisioningTick := time.NewTicker(2 * time.Minute)

h.GoRoutines.Run(ctx, "hatchery-vsphere-provisioning",
func(ctx context.Context) {
defer provisioningTick.Stop()
for {
select {
case <-ctx.Done():
return
case <-provisioningTick.C:
h.provisioning(ctx)
if len(h.Config.WorkerProvisioning) > 0 {
log.Debug(ctx, "provisioning is enabled")

provisioningInterval := 2 * time.Minute
if h.Config.WorkerProvisioningInterval > 0 {
provisioningInterval = time.Duration(h.Config.WorkerProvisioningInterval) * time.Second
}

provisioningTick := time.NewTicker(provisioningInterval)
h.GoRoutines.Run(ctx, "hatchery-vsphere-provisioning",
func(ctx context.Context) {
defer provisioningTick.Stop()
for {
select {
case <-ctx.Done():
return
case <-provisioningTick.C:
h.provisioning(ctx)
}
}
}
},
)
},
)
}

h.GoRoutines.Run(ctx, "hatchery-vsphere-kill-awol-servers",
func(ctx context.Context) {
5 changes: 3 additions & 2 deletions engine/hatchery/vsphere/ip.go
Original file line number Diff line number Diff line change
@@ -8,8 +8,8 @@ import (
"github.com/ovh/cds/sdk"
)

// for each ip in the range, look for the first free ones
func (h *HatcheryVSphere) findAvailableIP(ctx context.Context, workerName string) (string, error) {
// For each IPs in the range, look for the first free ones
func (h *HatcheryVSphere) findAvailableIP(ctx context.Context) (string, error) {
h.IpAddressesMutex.Lock()
defer h.IpAddressesMutex.Unlock()

@@ -45,6 +45,7 @@ func (h *HatcheryVSphere) findAvailableIP(ctx context.Context, workerName string
return ip, nil
}
}

return "", sdk.WithStack(errors.New("no IP address available"))
}

25 changes: 12 additions & 13 deletions engine/hatchery/vsphere/spawn.go
Original file line number Diff line number Diff line change
@@ -35,13 +35,12 @@ type annotation struct {
func (h *HatcheryVSphere) SpawnWorker(ctx context.Context, spawnArgs hatchery.SpawnArguments) (err error) {
ctx = context.WithValue(ctx, cdslog.AuthWorkerName, spawnArgs.WorkerName)
defer func() {
h.cachePendingJobID.mu.Lock()
h.cachePendingJobID.list = sdk.DeleteFromArray(h.cachePendingJobID.list, spawnArgs.JobID)
h.cachePendingJobID.mu.Unlock()
if err != nil {
ctx = sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "SpawnWorker %q from model %q: ERROR: %v", spawnArgs.WorkerName, spawnArgs.ModelName(), err)

h.cachePendingJobID.mu.Lock()
h.cachePendingJobID.list = sdk.DeleteFromArray(h.cachePendingJobID.list, spawnArgs.JobID)
h.cachePendingJobID.mu.Unlock()
} else {
log.Info(ctx, "SpawnWorker %q from model %q: DONE", spawnArgs.WorkerName, spawnArgs.ModelName())
}
@@ -54,14 +53,7 @@ func (h *HatcheryVSphere) SpawnWorker(ctx context.Context, spawnArgs hatchery.Sp
if spawnArgs.JobID != "0" {
h.cachePendingJobID.mu.Lock()
h.cachePendingJobID.list = append(h.cachePendingJobID.list, spawnArgs.JobID)
defer h.cachePendingJobID.mu.Unlock()

go func() {
time.Sleep(3 * time.Minute)
h.cachePendingJobID.mu.Lock()
h.cachePendingJobID.list = sdk.DeleteFromArray(h.cachePendingJobID.list, spawnArgs.JobID)
h.cachePendingJobID.mu.Unlock()
}()
h.cachePendingJobID.mu.Unlock()
}

var vmTemplate *object.VirtualMachine
@@ -71,7 +63,10 @@ func (h *HatcheryVSphere) SpawnWorker(ctx context.Context, spawnArgs hatchery.Sp
log.Info(ctx, "creating virtual machine model %q", spawnArgs.Model.GetName())
vmTemplate, err = h.createVirtualMachineTemplate(ctx, spawnArgs.Model, spawnArgs.WorkerName)
if err != nil {
log.Error(ctx, "Unable to create VM Model: %v", err)
if sdk.Cause(err).Error() == "no IP address available" {
log.Warn(ctx, "unable to create VM Model: %v", err)
return nil
}
return err
}
}
@@ -154,6 +149,10 @@ func (h *HatcheryVSphere) SpawnWorker(ctx context.Context, spawnArgs hatchery.Sp

cloneSpec, err := h.prepareCloneSpec(ctx, vmTemplate, &annot, spawnArgs.WorkerName)
if err != nil {
if sdk.Cause(err).Error() == "no IP address available" {
log.Warn(ctx, "unable to create worker: %v", err)
return nil
}
return err
}

2 changes: 2 additions & 0 deletions engine/hatchery/vsphere/types.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@ type HatcheryConfiguration struct {
SubnetMask string `mapstructure:"subnetMask" toml:"subnetMask" default:"255.255.255.0" commented:"false" comment:"Subnet Mask" json:"subnetMask"`
WorkerTTL int `mapstructure:"workerTTL" toml:"workerTTL" default:"120" commented:"false" comment:"Worker TTL (minutes)" json:"workerTTL"`
WorkerRegistrationTTL int `mapstructure:"workerRegistrationTTL" toml:"workerRegistrationTTL" commented:"false" comment:"Worker Registration TTL (minutes)" json:"workerRegistrationTTL"`
WorkerProvisioningInterval int `mapstructure:"workerProvisioningInterval" toml:"workerProvisioningInterval" commented:"true" comment:"Worker Provisioning interval (seconds)" json:"workerProvisioningInterval"`
WorkerProvisioningPoolSize int `mapstructure:"workerProvisioningPoolSize" toml:"workerProvisioningPoolSize" commented:"true" comment:"Worker Provisioning pool size" json:"workerProvisioningPoolSize"`
WorkerProvisioning []WorkerProvisioningConfig `mapstructure:"workerProvisioning" toml:"workerProvisioning" commented:"true" comment:"Worker Provisioning per model name" json:"workerProvisioning"`
GuestCredentials []GuestCredential `mapstructure:"guestCredentials" toml:"guestCredentials" commented:"true" comment:"List of Guest credentials" json:"-"`
}

0 comments on commit 47f7b72

Please sign in to comment.