Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve hatchery vsphere provisionning #6592

Merged
merged 4 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions engine/hatchery/vsphere/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,15 @@ func (h *HatcheryVSphere) prepareCloneSpec(ctx context.Context, vm *object.Virtu

if len(h.availableIPAddresses) > 0 {
var err error
ip, err := h.findAvailableIP(ctx, workerName)
ip, err := h.findAvailableIP(ctx)
if err != nil {
return nil, sdk.WithStack(err)
return nil, err
}
log.Debug(ctx, "Found %s as available IP", ip)
// Once we found an IP Address, we have to reserve this IP in local memory
// because the IP address won't be used directly on the server
if err := h.reserveIPAddress(ctx, ip); err != nil {
return nil, sdk.WithStack(err)
return nil, err
}

customSpec.NicSettingMap = []types.CustomizationAdapterMapping{{
Expand Down
92 changes: 72 additions & 20 deletions engine/hatchery/vsphere/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package vsphere
import (
"context"
"fmt"
"github.com/ovh/cds/sdk/telemetry"
"strings"
"sync"
"time"

jwt "github.com/golang-jwt/jwt"
Expand All @@ -19,6 +19,7 @@ import (
"github.com/ovh/cds/sdk/cdsclient"
"github.com/ovh/cds/sdk/hatchery"
"github.com/ovh/cds/sdk/namesgenerator"
"github.com/ovh/cds/sdk/telemetry"
)

// New instanciates a new Hatchery vsphere
Expand Down Expand Up @@ -198,6 +199,13 @@ func (h *HatcheryVSphere) CanSpawn(ctx context.Context, model sdk.WorkerStarterW
}
}

// Check if there is one ip available
if len(h.availableIPAddresses) > 0 {
if _, err := h.findAvailableIP(ctx); err != nil {
return false
}
}

return true
}

Expand Down Expand Up @@ -274,6 +282,9 @@ func (h *HatcheryVSphere) WorkersStarted(ctx context.Context) ([]string, error)
srvs := h.getVirtualMachines(ctx)
res := make([]string, 0, len(srvs))
for _, s := range srvs {
if strings.HasPrefix(s.Name, "provision-") {
continue
}
res = append(res, s.Name)
}
return res, nil
Expand Down Expand Up @@ -328,6 +339,9 @@ func (h *HatcheryVSphere) killAwolServers(ctx context.Context) {
if annot == nil {
continue
}
if annot.HatcheryName != h.Name() {
continue
}

var isMarkToDelete = h.isMarkedToDelete(s)
var isPoweredOff = s.Summary.Runtime.PowerState != types.VirtualMachinePowerStatePoweredOn
Expand Down Expand Up @@ -380,39 +394,36 @@ func (h *HatcheryVSphere) killAwolServers(ctx context.Context) {
}

func (h *HatcheryVSphere) provisioning(ctx context.Context) {
if len(h.Config.WorkerProvisioning) == 0 {
log.Debug(ctx, "provisioning is disabled")
return
}

if len(h.cacheProvisioning.pending) > 0 {
log.Debug(ctx, "provisioning is still on going")
return
}

h.cacheProvisioning.mu.Lock()

// Count exiting provisionned machine for each model
var mapAlreadyProvisionned = make(map[string]int)
machines := h.getVirtualMachines(ctx)
for _, machine := range machines {
if !strings.HasPrefix(machine.Name, "provision-") {
continue
}
annot := getVirtualMachineCDSAnnotation(ctx, machine)
if annot == nil {
continue
}
// Provisionned machines are powered off
if annot.Provisioning && machine.Runtime.PowerState != types.VirtualMachinePowerStatePoweredOn {
if annot.HatcheryName != h.Name() {
continue
}
if annot.Provisioning {
mapAlreadyProvisionned[annot.WorkerModelPath] = mapAlreadyProvisionned[annot.WorkerModelPath] + 1
}
}

h.cacheProvisioning.mu.Unlock()

// Count provision to create for each model
mapToProvision := make(map[string]int)
mapModels := make(map[string]sdk.Model)
for i := range h.Config.WorkerProvisioning {
modelPath := h.Config.WorkerProvisioning[i].ModelPath
number := h.Config.WorkerProvisioning[i].Number

if number == 0 {
continue // If provisioning is disabled
if modelPath == "" || number == 0 {
continue
}

tuple := strings.Split(modelPath, "/")
Expand All @@ -435,21 +446,62 @@ func (h *HatcheryVSphere) provisioning(ctx context.Context) {

log.Info(ctx, "model %q provisioning: %d/%d", modelPath, mapAlreadyProvisionned[modelPath], number)

for i := 0; i < int(number)-mapAlreadyProvisionned[modelPath]; i++ {
mapModels[modelPath] = model
count := int(number) - mapAlreadyProvisionned[modelPath]
if count > 0 {
mapToProvision[modelPath] = count
}
}

// Distribute models in provision queue
countModelToProvision := len(mapToProvision)
if countModelToProvision == 0 {
return
}
poolSize := h.Config.WorkerProvisioningPoolSize
if poolSize == 0 {
poolSize = 1
}
var provisionQueue []string
for len(mapToProvision) > 0 {
for i := range h.Config.WorkerProvisioning {
modelPath := h.Config.WorkerProvisioning[i].ModelPath
count, ok := mapToProvision[modelPath]
if !ok {
continue
}
if count == 0 {
delete(mapToProvision, modelPath)
continue
}
provisionQueue = append(provisionQueue, modelPath)
mapToProvision[modelPath] = mapToProvision[modelPath] - 1
}
}

// Provision workers
wg := new(sync.WaitGroup)
for i := 0; i < len(provisionQueue) && i < poolSize; i++ {
modelPath := provisionQueue[i]
wg.Add(1)
go func() {
defer wg.Done()

workerName := namesgenerator.GenerateWorkerName(modelPath, "provision")

h.cacheProvisioning.mu.Lock()
h.cacheProvisioning.pending = append(h.cacheProvisioning.pending, workerName)
h.cacheProvisioning.mu.Unlock()

if err := h.ProvisionWorker(ctx, model, workerName); err != nil {
if err := h.ProvisionWorker(ctx, mapModels[modelPath], workerName); err != nil {
ctx = log.ContextWithStackTrace(ctx, err)
log.Error(ctx, "unable to provision model %q: %v", modelPath, err)
}

h.cacheProvisioning.mu.Lock()
h.cacheProvisioning.pending = sdk.DeleteFromArray(h.cacheProvisioning.pending, workerName)
h.cacheProvisioning.mu.Unlock()
}
}()
}
wg.Wait()
}
5 changes: 3 additions & 2 deletions engine/hatchery/vsphere/hatchery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
sdkhatchery "github.com/ovh/cds/sdk/hatchery"
"strings"
"testing"
"time"

sdkhatchery "github.com/ovh/cds/sdk/hatchery"

"github.com/golang/mock/gomock"
"github.com/ovh/cds/engine/hatchery"
"github.com/ovh/cds/engine/service"
Expand Down Expand Up @@ -485,7 +486,7 @@ func TestHatcheryVSphere_provisioning_do_nothing(t *testing.T) {
},
}, {
ManagedEntity: mo.ManagedEntity{
Name: "provisionned_worker",
Name: "provision-worker",
},
Summary: types.VirtualMachineSummary{
Config: types.VirtualMachineConfigSummary{
Expand Down
35 changes: 22 additions & 13 deletions engine/hatchery/vsphere/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,30 @@ func (h *HatcheryVSphere) InitHatchery(ctx context.Context) error {

killAwolServersTick := time.NewTicker(2 * time.Minute)
killDisabledWorkersTick := time.NewTicker(2 * time.Minute)
provisioningTick := time.NewTicker(2 * time.Minute)

h.GoRoutines.Run(ctx, "hatchery-vsphere-provisioning",
func(ctx context.Context) {
defer provisioningTick.Stop()
for {
select {
case <-ctx.Done():
return
case <-provisioningTick.C:
h.provisioning(ctx)
if len(h.Config.WorkerProvisioning) > 0 {
log.Debug(ctx, "provisioning is enabled")

provisioningInterval := 2 * time.Minute
if h.Config.WorkerProvisioningInterval > 0 {
provisioningInterval = time.Duration(h.Config.WorkerProvisioningInterval) * time.Second
}

provisioningTick := time.NewTicker(provisioningInterval)
h.GoRoutines.Run(ctx, "hatchery-vsphere-provisioning",
func(ctx context.Context) {
defer provisioningTick.Stop()
for {
select {
case <-ctx.Done():
return
case <-provisioningTick.C:
h.provisioning(ctx)
}
}
}
},
)
},
)
}

h.GoRoutines.Run(ctx, "hatchery-vsphere-kill-awol-servers",
func(ctx context.Context) {
Expand Down
5 changes: 3 additions & 2 deletions engine/hatchery/vsphere/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/ovh/cds/sdk"
)

// for each ip in the range, look for the first free ones
func (h *HatcheryVSphere) findAvailableIP(ctx context.Context, workerName string) (string, error) {
// For each IPs in the range, look for the first free ones
func (h *HatcheryVSphere) findAvailableIP(ctx context.Context) (string, error) {
h.IpAddressesMutex.Lock()
defer h.IpAddressesMutex.Unlock()

Expand Down Expand Up @@ -45,6 +45,7 @@ func (h *HatcheryVSphere) findAvailableIP(ctx context.Context, workerName string
return ip, nil
}
}

return "", sdk.WithStack(errors.New("no IP address available"))
}

Expand Down
25 changes: 12 additions & 13 deletions engine/hatchery/vsphere/spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ type annotation struct {
func (h *HatcheryVSphere) SpawnWorker(ctx context.Context, spawnArgs hatchery.SpawnArguments) (err error) {
ctx = context.WithValue(ctx, cdslog.AuthWorkerName, spawnArgs.WorkerName)
defer func() {
h.cachePendingJobID.mu.Lock()
h.cachePendingJobID.list = sdk.DeleteFromArray(h.cachePendingJobID.list, spawnArgs.JobID)
h.cachePendingJobID.mu.Unlock()
if err != nil {
ctx = sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "SpawnWorker %q from model %q: ERROR: %v", spawnArgs.WorkerName, spawnArgs.ModelName(), err)

h.cachePendingJobID.mu.Lock()
h.cachePendingJobID.list = sdk.DeleteFromArray(h.cachePendingJobID.list, spawnArgs.JobID)
h.cachePendingJobID.mu.Unlock()
} else {
log.Info(ctx, "SpawnWorker %q from model %q: DONE", spawnArgs.WorkerName, spawnArgs.ModelName())
}
Expand All @@ -54,14 +53,7 @@ func (h *HatcheryVSphere) SpawnWorker(ctx context.Context, spawnArgs hatchery.Sp
if spawnArgs.JobID != "0" {
h.cachePendingJobID.mu.Lock()
h.cachePendingJobID.list = append(h.cachePendingJobID.list, spawnArgs.JobID)
defer h.cachePendingJobID.mu.Unlock()

go func() {
time.Sleep(3 * time.Minute)
h.cachePendingJobID.mu.Lock()
h.cachePendingJobID.list = sdk.DeleteFromArray(h.cachePendingJobID.list, spawnArgs.JobID)
h.cachePendingJobID.mu.Unlock()
}()
h.cachePendingJobID.mu.Unlock()
}

var vmTemplate *object.VirtualMachine
Expand All @@ -71,7 +63,10 @@ func (h *HatcheryVSphere) SpawnWorker(ctx context.Context, spawnArgs hatchery.Sp
log.Info(ctx, "creating virtual machine model %q", spawnArgs.Model.GetName())
vmTemplate, err = h.createVirtualMachineTemplate(ctx, spawnArgs.Model, spawnArgs.WorkerName)
if err != nil {
log.Error(ctx, "Unable to create VM Model: %v", err)
if sdk.Cause(err).Error() == "no IP address available" {
log.Warn(ctx, "unable to create VM Model: %v", err)
return nil
}
return err
}
}
Expand Down Expand Up @@ -154,6 +149,10 @@ func (h *HatcheryVSphere) SpawnWorker(ctx context.Context, spawnArgs hatchery.Sp

cloneSpec, err := h.prepareCloneSpec(ctx, vmTemplate, &annot, spawnArgs.WorkerName)
if err != nil {
if sdk.Cause(err).Error() == "no IP address available" {
log.Warn(ctx, "unable to create worker: %v", err)
return nil
}
return err
}

Expand Down
2 changes: 2 additions & 0 deletions engine/hatchery/vsphere/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type HatcheryConfiguration struct {
SubnetMask string `mapstructure:"subnetMask" toml:"subnetMask" default:"255.255.255.0" commented:"false" comment:"Subnet Mask" json:"subnetMask"`
WorkerTTL int `mapstructure:"workerTTL" toml:"workerTTL" default:"120" commented:"false" comment:"Worker TTL (minutes)" json:"workerTTL"`
WorkerRegistrationTTL int `mapstructure:"workerRegistrationTTL" toml:"workerRegistrationTTL" commented:"false" comment:"Worker Registration TTL (minutes)" json:"workerRegistrationTTL"`
WorkerProvisioningInterval int `mapstructure:"workerProvisioningInterval" toml:"workerProvisioningInterval" commented:"true" comment:"Worker Provisioning interval (seconds)" json:"workerProvisioningInterval"`
WorkerProvisioningPoolSize int `mapstructure:"workerProvisioningPoolSize" toml:"workerProvisioningPoolSize" commented:"true" comment:"Worker Provisioning pool size" json:"workerProvisioningPoolSize"`
WorkerProvisioning []WorkerProvisioningConfig `mapstructure:"workerProvisioning" toml:"workerProvisioning" commented:"true" comment:"Worker Provisioning per model name" json:"workerProvisioning"`
GuestCredentials []GuestCredential `mapstructure:"guestCredentials" toml:"guestCredentials" commented:"true" comment:"List of Guest credentials" json:"-"`
}
Expand Down