Skip to content

Commit

Permalink
feat(harchery:swarm): improve stability when sharing host (#6000)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt authored Nov 22, 2021
1 parent af0a7e3 commit 3641057
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 133 deletions.
99 changes: 37 additions & 62 deletions engine/hatchery/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ func (h *HatcherySwarm) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw
_, next := telemetry.Span(ctx, "swarm.chooseDockerEngine")
for dname, dclient := range h.dockerClients {
ctxList, cancelList := context.WithTimeout(context.Background(), 3*time.Second)
containers, errc := dclient.ContainerList(ctxList, types.ContainerListOptions{All: true})
if errc != nil {
log.Error(ctx, "hatchery> swarm> SpawnWorker> unable to list containers on %s: %v", dname, errc)
containers, err := dclient.ContainerList(ctxList, types.ContainerListOptions{All: true})
if err != nil {
log.Error(ctx, "hatchery> swarm> SpawnWorker> unable to list containers on %s: %v", dname, err)
cancelList()
continue
}
Expand All @@ -223,7 +223,7 @@ func (h *HatcherySwarm) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw

var nbContainersFromHatchery int
for _, cont := range containers {
if _, ok := cont.Labels["hatchery"]; ok {
if hatcheryName, ok := cont.Labels[LabelHatchery]; ok && hatcheryName == h.Config.Name {
nbContainersFromHatchery++
}
}
Expand Down Expand Up @@ -313,9 +313,9 @@ func (h *HatcherySwarm) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw

//labels are used to make container cleanup easier. We "link" the service to its worker this way.
labels := map[string]string{
"service_worker": spawnArgs.WorkerName,
"service_name": serviceName,
"hatchery": h.Config.Name,
LabelServiceWorker: spawnArgs.WorkerName,
LabelServiceName: serviceName,
LabelHatchery: h.Config.Name,
}

if spawnArgs.JobID > 0 {
Expand Down Expand Up @@ -362,10 +362,10 @@ func (h *HatcherySwarm) SpawnWorker(ctx context.Context, spawnArgs hatchery.Spaw

//labels are used to make container cleanup easier
labels := map[string]string{
"worker_model_path": spawnArgs.Model.Group.Name + "/" + spawnArgs.Model.Name,
"worker_name": spawnArgs.WorkerName,
"worker_requirements": strings.Join(services, ","),
"hatchery": h.Config.Name,
LabelWorkerModelPath: spawnArgs.Model.Group.Name + "/" + spawnArgs.Model.Name,
LabelWorkerName: spawnArgs.WorkerName,
LabelWorkerRequirements: strings.Join(services, ","),
LabelHatchery: h.Config.Name,
}

// Add new options on hatchery swarm to allow advanced docker option such as addHost, priviledge, port mapping and so one: #4594
Expand Down Expand Up @@ -456,34 +456,25 @@ func (h *HatcherySwarm) CanSpawn(ctx context.Context, model *sdk.Model, jobID in
}
}
for dockerName, dockerClient := range h.dockerClients {
//List all containers to check if we can spawn a new one
cs, errList := h.getContainers(dockerClient, types.ContainerListOptions{All: true})
if errList != nil {
log.Error(ctx, "hatchery> swarm> CanSpawn> Unable to list containers on %s: %s", dockerName, errList)
// List all containers to check if we can spawn a new one
cs, err := h.getContainers(ctx, dockerClient, types.ContainerListOptions{All: true})
if err != nil {
log.Error(ctx, "hatchery> swarm> CanSpawn> Unable to list containers on %s: %s", dockerName, err)
continue
}

var nbContainersFromHatchery int
for _, cont := range cs {
if _, ok := cont.Labels["hatchery"]; ok {
nbContainersFromHatchery++
}
}
nbContainersFromHatchery := len(cs)

//List all workers
ws, errWList := h.getWorkerContainers(cs, types.ContainerListOptions{})
if errWList != nil {
log.Error(ctx, "hatchery> swarm> CanSpawn> Unable to list workers on %s: %s", dockerName, errWList)
continue
}
// List all workers containers
ws := cs.FilterWorkers()

//Checking the number of container on each docker engine
// Checking the number of container on each docker engine
if nbContainersFromHatchery >= dockerClient.MaxContainers {
log.Debug(ctx, "hatchery> swarm> CanSpawn> max containers reached on %s. current:%d max:%d", dockerName, nbContainersFromHatchery, dockerClient.MaxContainers)
continue
}

//Get links from requirements
// Get links from requirements
links := map[string]string{}
for _, r := range requirements {
if r.Type == sdk.ServiceRequirement {
Expand Down Expand Up @@ -512,33 +503,19 @@ func (h *HatcherySwarm) CanSpawn(ctx context.Context, model *sdk.Model, jobID in
return false
}

func (h *HatcherySwarm) getWorkerContainers(containers []types.Container, option types.ContainerListOptions) ([]types.Container, error) {
res := []types.Container{}
//We only count worker
for _, cont := range containers {
if _, ok := cont.Labels["worker_name"]; ok {
if hatch, ok := cont.Labels["hatchery"]; !ok || hatch == h.Config.Name {
res = append(res, cont)
}
}
}
return res, nil
}

// WorkersStarted returns the number of instances started but
// not necessarily register on CDS yet
func (h *HatcherySwarm) WorkersStarted(ctx context.Context) []string {
res := make([]string, 0)
for _, dockerClient := range h.dockerClients {
// get only started containers
containers, errList := h.getContainers(dockerClient, types.ContainerListOptions{All: true})
if errList != nil {
log.Error(ctx, "hatchery> swarm> WorkersStarted> Unable to list containers: %s", errList)
containers, err := h.getContainers(ctx, dockerClient, types.ContainerListOptions{All: true})
if err != nil {
log.Error(ctx, "hatchery> swarm> WorkersStarted> Unable to list containers: %s", err)
return nil
}
workers, _ := h.getWorkerContainers(containers, types.ContainerListOptions{})
workers := containers.FilterWorkers()
for _, w := range workers {
res = append(res, w.Labels["worker_name"])
res = append(res, w.Labels[LabelWorkerName])
}
}
return res
Expand Down Expand Up @@ -604,21 +581,19 @@ func (h *HatcherySwarm) routines(ctx context.Context) {
}
}

func (h *HatcherySwarm) listAwolWorkers(dockerClientName string, containers []types.Container) ([]types.Container, error) {
func (h *HatcherySwarm) listAwolWorkers(dockerClientName string, containers Containers) (Containers, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

apiworkers, err := h.CDSClient().WorkerList(ctx)
if err != nil {
return nil, sdk.WrapError(err, "Cannot get workers on %s", dockerClientName)
}

workers, errList := h.getWorkerContainers(containers, types.ContainerListOptions{All: true})
if errList != nil {
return nil, sdk.WrapError(err, "Cannot list containers on %s", dockerClientName)
}
workers := containers.FilterWorkers()

//Checking workers
oldContainers := []types.Container{}
oldContainers := make(Containers, 0, len(workers))
for _, c := range workers {
if !strings.Contains(c.Status, "Exited") && time.Now().Add(-3*time.Minute).Unix() < c.Created {
log.Debug(ctx, "hatchery> swarm> listAwolWorkers> container %s(status=%s) is too young", c.Names[0], c.Status)
Expand Down Expand Up @@ -657,10 +632,10 @@ func (h *HatcherySwarm) listAwolWorkers(dockerClientName string, containers []ty

func (h *HatcherySwarm) killAwolWorker(ctx context.Context) error {
for _, dockerClient := range h.dockerClients {
containers, errC := h.getContainers(dockerClient, types.ContainerListOptions{All: true})
if errC != nil {
log.Warn(ctx, "hatchery> swarm> killAwolWorker> Cannot list containers: %s on %s", errC, dockerClient.name)
return errC
containers, err := h.getContainers(ctx, dockerClient, types.ContainerListOptions{All: true})
if err != nil {
log.Warn(ctx, "hatchery> swarm> killAwolWorker> Cannot list containers: %s on %s", err, dockerClient.name)
return err
}

oldContainers, err := h.listAwolWorkers(dockerClient.name, containers)
Expand All @@ -687,16 +662,16 @@ func (h *HatcherySwarm) killAwolWorker(ctx context.Context) error {
// Checking services
for _, c := range containers {
// checks if the container is a service based on its labels
if c.Labels["service_worker"] == "" {
if c.Labels[LabelServiceWorker] == "" {
continue
}
// if the worker associated to this service is still alive do not kill the service
if _, workerStillAlive := mContainers[c.Labels["service_worker"]]; workerStillAlive {
if _, workerStillAlive := mContainers[c.Labels[LabelServiceWorker]]; workerStillAlive {
continue
}

if !strings.Contains(c.Status, "Exited") && time.Now().Add(-3*time.Minute).Unix() < c.Created {
log.Debug(ctx, "hatchery> swarm> killAwolWorker> container %s(status=%s) is too young - service associated to worker %s", c.Names[0], c.Status, c.Labels["service_worker"])
log.Debug(ctx, "hatchery> swarm> killAwolWorker> container %s(status=%s) is too young - service associated to worker %s", c.Names[0], c.Status, c.Labels[LabelServiceWorker])
continue
}

Expand All @@ -714,7 +689,7 @@ func (h *HatcherySwarm) killAwolWorker(ctx context.Context) error {
HatcheryName: h.ServiceName(),
RequirementID: jobIdentifiers.ServiceID,
RequirementName: c.Labels[hatchery.LabelServiceReqName],
WorkerName: c.Labels["service_worker"],
WorkerName: c.Labels[LabelServiceWorker],
},
ProjectKey: c.Labels[hatchery.LabelServiceProjectKey],
WorkflowName: c.Labels[hatchery.LabelServiceWorkflowName],
Expand Down
4 changes: 2 additions & 2 deletions engine/hatchery/swarm/swarm_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (h *HatcherySwarm) Status(ctx context.Context) *sdk.MonitoringStatus {
for dockerName, dockerClient := range h.dockerClients {
//Check images
status := sdk.MonitoringStatusOK
ctxList, cancelList := context.WithTimeout(context.Background(), 20*time.Second)
ctxList, cancelList := context.WithTimeout(ctx, 20*time.Second)
defer cancelList()
images, err := dockerClient.ImageList(ctxList, types.ImageListOptions{All: true})
if err != nil {
Expand All @@ -77,7 +77,7 @@ func (h *HatcherySwarm) Status(ctx context.Context) *sdk.MonitoringStatus {
m.AddLine(sdk.MonitoringStatusLine{Component: "Images-" + dockerName, Value: fmt.Sprintf("%d", len(images)), Status: status})
//Check containers
status = sdk.MonitoringStatusOK
cs, err := h.getContainers(dockerClient, types.ContainerListOptions{All: true})
cs, err := h.getContainers(ctx, dockerClient, types.ContainerListOptions{All: true})
if err != nil {
log.Warn(ctx, "hatchery> swarm> %s> Status> Unable to list containers on %s: %s", h.Name(), dockerName, err)
status = sdk.MonitoringStatusWarn
Expand Down
Loading

0 comments on commit 3641057

Please sign in to comment.