Skip to content

Commit

Permalink
feat(hatchery:k8s): worker config as secret (#6105)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardlt authored Mar 8, 2022
1 parent 2876a3c commit f0e0d92
Show file tree
Hide file tree
Showing 22 changed files with 332 additions and 175 deletions.
6 changes: 3 additions & 3 deletions engine/hatchery/kubernetes/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ func NewHatcheryKubernetesTest(t *testing.T) *HatcheryKubernetes {
h.kubeClient = &kubernetesClient{clientSet}
gock.InterceptClient(clientSet.CoreV1().RESTClient().(*rest.RESTClient).Client)

h.Config.Name = "kyubi"
h.Config.Namespace = "hachibi"
h.Config.Name = "my-hatchery"
h.Config.Namespace = "cds-workers"
h.ServiceInstance = &sdk.Service{
CanonicalService: sdk.CanonicalService{
ID: 1,
Name: "kyubi",
Name: "my-hatchery",
},
}
return h
Expand Down
40 changes: 34 additions & 6 deletions engine/hatchery/kubernetes/kill_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"context"
"fmt"
"strconv"
"strings"
"time"
Expand All @@ -17,20 +18,47 @@ import (
)

func (h *HatcheryKubernetes) killAwolWorkers(ctx context.Context) error {
pods, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{LabelSelector: LABEL_WORKER})
pods, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s", LABEL_HATCHERY_NAME, h.Config.Name, LABEL_WORKER_NAME),
})
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
workers, err := h.CDSClient().WorkerList(ctx)
if err != nil {
return err
}

var globalErr error
for _, pod := range pods.Items {
annotations := pod.GetAnnotations()
labels := pod.GetLabels()
toDelete := false
for _, container := range pod.Status.ContainerStatuses {
if labels == nil {
continue
}

if (container.State.Terminated != nil && (container.State.Terminated.Reason == "Completed" || container.State.Terminated.Reason == "Error")) ||
(container.State.Waiting != nil && container.State.Waiting.Reason == "ErrImagePull") {
toDelete = true
var toDelete, found bool
for _, w := range workers {
if workerName, ok := labels[LABEL_WORKER_NAME]; ok && workerName == w.Name {
found = true
break
}
}
if !found {
toDelete = true
}

if !toDelete {
for _, container := range pod.Status.ContainerStatuses {
terminated := (container.State.Terminated != nil && (container.State.Terminated.Reason == "Completed" || container.State.Terminated.Reason == "Error"))
errImagePull := (container.State.Waiting != nil && container.State.Waiting.Reason == "ErrImagePull")
if terminated || errImagePull {
toDelete = true
break
}
}
}

Expand Down
57 changes: 45 additions & 12 deletions engine/hatchery/kubernetes/kill_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"gopkg.in/h2non/gock.v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/ovh/cds/sdk"
)

func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) {
Expand All @@ -19,8 +21,12 @@ func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) {
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "w1",
Namespace: "kyubi",
Name: "worker-1",
Namespace: "cds-workers",
Labels: map[string]string{
LABEL_HATCHERY_NAME: "my-hatchery",
LABEL_WORKER_NAME: "worker-1",
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
Expand All @@ -36,15 +42,23 @@ func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) {
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "wrong",
Namespace: "kyubi",
Name: "worker-2",
Namespace: "cds-workers",
Labels: map[string]string{
LABEL_HATCHERY_NAME: "my-hatchery",
LABEL_WORKER_NAME: "worker-2",
},
},
Spec: v1.PodSpec{},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "w2",
Namespace: "kyubi",
Name: "worker-3",
Namespace: "cds-workers",
Labels: map[string]string{
LABEL_HATCHERY_NAME: "my-hatchery",
LABEL_WORKER_NAME: "worker-3",
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
Expand All @@ -60,8 +74,12 @@ func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) {
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "w3",
Namespace: "kyubi",
Name: "worker-4",
Namespace: "cds-workers",
Labels: map[string]string{
LABEL_HATCHERY_NAME: "my-hatchery",
LABEL_WORKER_NAME: "worker-4",
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
Expand All @@ -75,13 +93,28 @@ func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) {
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "worker-5",
Namespace: "cds-workers",
Labels: map[string]string{
LABEL_HATCHERY_NAME: "my-hatchery",
LABEL_WORKER_NAME: "worker-5",
},
},
},
},
}
gock.New("http://lolcat.kube").Get("/api/v1/namespaces/hachibi/pods").Reply(http.StatusOK).JSON(podsList)
gock.New("http://lolcat.kube").Get("/api/v1/namespaces/cds-workers/pods").Reply(http.StatusOK).JSON(podsList)

gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/cds-workers/pods/worker-1").Reply(http.StatusOK).JSON(nil)
gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/cds-workers/pods/worker-2").Reply(http.StatusOK).JSON(nil)
gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/cds-workers/pods/worker-3").Reply(http.StatusOK).JSON(nil)
gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/cds-workers/pods/worker-4").Reply(http.StatusOK).JSON(nil)

gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/kyubi/pods/w1").Reply(http.StatusOK).JSON(nil)
gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/kyubi/pods/w2").Reply(http.StatusOK).JSON(nil)
gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/kyubi/pods/w3").Reply(http.StatusOK).JSON(nil)
gock.New("http://lolcat.api").Get("/worker").Reply(http.StatusOK).JSON([]sdk.Worker{{
Name: "worker-5",
}})

err := h.killAwolWorkers(context.TODO())
require.NoError(t, err)
Expand Down
87 changes: 51 additions & 36 deletions engine/hatchery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdsclient"
"github.com/ovh/cds/sdk/hatchery"
"github.com/ovh/cds/sdk/slug"
)

// New instanciates a new hatchery local
Expand Down Expand Up @@ -99,8 +100,12 @@ func (h *HatcheryKubernetes) ApplyConfiguration(cfg interface{}) error {
// Status returns sdk.MonitoringStatus, implements interface service.Service
func (h *HatcheryKubernetes) Status(ctx context.Context) *sdk.MonitoringStatus {
m := h.NewMonitoringStatus()
m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(h.WorkersStarted(ctx)), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK})

ws, err := h.WorkersStarted(ctx)
if err != nil {
ctx = log.ContextWithStackTrace(ctx, err)
log.Warn(ctx, err.Error())
}
m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(ws), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK})
return m
}

Expand Down Expand Up @@ -171,11 +176,6 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
return sdk.WithStack(fmt.Errorf("no job ID and no register"))
}

label := "execution"
if spawnArgs.RegisterOnly {
label = "register"
}

var logJob string
if spawnArgs.JobID > 0 {
logJob = fmt.Sprintf("for workflow job %d,", spawnArgs.JobID)
Expand Down Expand Up @@ -221,7 +221,6 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
envsWm := workerConfig.InjectEnvVars
envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory)
envsWm["CDS_FROM_WORKER_IMAGE"] = "true"
envsWm["CDS_CONFIG"] = workerConfig.EncodeBase64()

for envName, envValue := range spawnArgs.Model.ModelDocker.Envs {
envsWm[envName] = envValue
Expand All @@ -239,16 +238,33 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
pullPolicy = "Always"
}

// Create secret for worker config
configSecretName, err := h.createConfigSecret(ctx, workerConfig)
if err != nil {
return sdk.WrapError(err, "cannot create secret for config %s", workerConfig.Name)
}
envs = append(envs, apiv1.EnvVar{
Name: "CDS_CONFIG",
ValueFrom: &apiv1.EnvVarSource{
SecretKeyRef: &apiv1.SecretKeySelector{
LocalObjectReference: apiv1.LocalObjectReference{
Name: configSecretName,
},
Key: "CDS_CONFIG",
},
},
})

var gracePeriodSecs int64
podSchema := apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: spawnArgs.WorkerName,
Namespace: h.Config.Namespace,
DeletionGracePeriodSeconds: &gracePeriodSecs,
Labels: map[string]string{
LABEL_WORKER: label,
LABEL_WORKER_MODEL: strings.ToLower(spawnArgs.Model.Name),
LABEL_HATCHERY_NAME: h.Configuration().Name,
LABEL_HATCHERY_NAME: h.Configuration().Name,
LABEL_WORKER_NAME: workerConfig.Name,
LABEL_WORKER_MODEL_PATH: slug.Convert(spawnArgs.Model.Path()),
},
Annotations: map[string]string{},
},
Expand All @@ -273,6 +289,15 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
},
}

// Check here to add secret if needed
if spawnArgs.Model.ModelDocker.Private {
secretRegistryName, err := h.createRegistrySecret(ctx, *spawnArgs.Model)
if err != nil {
return sdk.WrapError(err, "cannot create secret for model %s", spawnArgs.Model.Path())
}
podSchema.Spec.ImagePullSecrets = []apiv1.LocalObjectReference{{Name: secretRegistryName}}
}

var services []sdk.Requirement
for _, req := range spawnArgs.Requirements {
if req.Type == sdk.ServiceRequirement {
Expand All @@ -286,16 +311,6 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
podSchema.Spec.HostAliases[0].Hostnames[0] = "worker"
}

// Check here to add secret if needed
secretName := "cds-credreg-" + spawnArgs.Model.Name
if spawnArgs.Model.ModelDocker.Private {
if err := h.createSecret(ctx, secretName, *spawnArgs.Model); err != nil {
return sdk.WrapError(err, "cannot create secret for model %s", spawnArgs.Model.Path())
}
podSchema.Spec.ImagePullSecrets = []apiv1.LocalObjectReference{{Name: secretName}}
podSchema.ObjectMeta.Labels[LABEL_SECRET] = secretName
}

for i, serv := range services {
//name= <alias> => the name of the host put in /etc/hosts of the worker
//value= "postgres:latest env_1=blabla env_2=blabla"" => we can add env variables in requirement name
Expand Down Expand Up @@ -345,7 +360,7 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
podSchema.Spec.HostAliases[0].Hostnames[i+1] = strings.ToLower(serv.Name)
}

_, err := h.kubeClient.PodCreate(ctx, h.Config.Namespace, &podSchema, metav1.CreateOptions{})
_, err = h.kubeClient.PodCreate(ctx, h.Config.Namespace, &podSchema, metav1.CreateOptions{})
log.Debug(ctx, "hatchery> kubernetes> SpawnWorker> %s > Pod created", spawnArgs.WorkerName)
return sdk.WithStack(err)
}
Expand All @@ -356,20 +371,18 @@ func (h *HatcheryKubernetes) GetLogger() *logrus.Logger {

// WorkersStarted returns the number of instances started but
// not necessarily register on CDS yet
func (h *HatcheryKubernetes) WorkersStarted(ctx context.Context) []string {
list, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{LabelSelector: LABEL_HATCHERY_NAME})
func (h *HatcheryKubernetes) WorkersStarted(ctx context.Context) ([]string, error) {
list, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s", LABEL_HATCHERY_NAME, h.Config.Name, LABEL_WORKER_NAME),
})
if err != nil {
log.Warn(ctx, "WorkersStarted> unable to list pods on namespace %s", h.Config.Namespace)
return nil
return nil, sdk.WrapError(err, "unable to list pods on namespace %s", h.Config.Namespace)
}
workerNames := make([]string, 0, list.Size())
for _, pod := range list.Items {
labels := pod.GetLabels()
if labels[LABEL_HATCHERY_NAME] == h.Configuration().Name {
workerNames = append(workerNames, pod.GetName())
}
workerNames = append(workerNames, pod.GetName())
}
return workerNames
return workerNames, nil
}

// NeedRegistration return true if worker model need regsitration
Expand All @@ -381,31 +394,33 @@ func (h *HatcheryKubernetes) NeedRegistration(_ context.Context, m *sdk.Model) b
}

func (h *HatcheryKubernetes) routines(ctx context.Context) {
ticker := time.NewTicker(10 * time.Minute)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
h.GoRoutines.Exec(ctx, "getCDNConfiguration", func(ctx context.Context) {
if err := h.Common.RefreshServiceLogger(ctx); err != nil {
log.Error(ctx, "hatchery> kubernetes> cannot get cdn configuration : %v", err)
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot get cdn configuration"))
}
})

h.GoRoutines.Exec(ctx, "getServicesLogs", func(ctx context.Context) {
if err := h.getServicesLogs(ctx); err != nil {
log.Error(ctx, "Hatchery> Kubernetes> Cannot get service logs : %v", err)
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot get service logs"))
}
})

h.GoRoutines.Exec(ctx, "killAwolWorker", func(ctx context.Context) {
_ = h.killAwolWorkers(ctx)
if err := h.killAwolWorkers(ctx); err != nil {
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot delete awol worker"))
}
})

h.GoRoutines.Exec(ctx, "deleteSecrets", func(ctx context.Context) {
if err := h.deleteSecrets(ctx); err != nil {
log.Error(ctx, "hatchery> kubernetes> cannot handle secrets : %v", err)
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot delete secrets"))
}
})
case <-ctx.Done():
Expand Down
Loading

0 comments on commit f0e0d92

Please sign in to comment.