diff --git a/engine/hatchery/kubernetes/helper_test.go b/engine/hatchery/kubernetes/helper_test.go index 4d6673057e..05ca17ce15 100644 --- a/engine/hatchery/kubernetes/helper_test.go +++ b/engine/hatchery/kubernetes/helper_test.go @@ -23,12 +23,12 @@ func NewHatcheryKubernetesTest(t *testing.T) *HatcheryKubernetes { h.kubeClient = &kubernetesClient{clientSet} gock.InterceptClient(clientSet.CoreV1().RESTClient().(*rest.RESTClient).Client) - h.Config.Name = "kyubi" - h.Config.Namespace = "hachibi" + h.Config.Name = "my-hatchery" + h.Config.Namespace = "cds-workers" h.ServiceInstance = &sdk.Service{ CanonicalService: sdk.CanonicalService{ ID: 1, - Name: "kyubi", + Name: "my-hatchery", }, } return h diff --git a/engine/hatchery/kubernetes/kill_workers.go b/engine/hatchery/kubernetes/kill_workers.go index e931549b33..fe0e10c13e 100644 --- a/engine/hatchery/kubernetes/kill_workers.go +++ b/engine/hatchery/kubernetes/kill_workers.go @@ -2,6 +2,7 @@ package kubernetes import ( "context" + "fmt" "strconv" "strings" "time" @@ -17,20 +18,47 @@ import ( ) func (h *HatcheryKubernetes) killAwolWorkers(ctx context.Context) error { - pods, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{LabelSelector: LABEL_WORKER}) + pods, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s,%s", LABEL_HATCHERY_NAME, h.Config.Name, LABEL_WORKER_NAME), + }) if err != nil { return err } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + workers, err := h.CDSClient().WorkerList(ctx) + if err != nil { + return err + } + var globalErr error for _, pod := range pods.Items { annotations := pod.GetAnnotations() labels := pod.GetLabels() - toDelete := false - for _, container := range pod.Status.ContainerStatuses { + if labels == nil { + continue + } - if (container.State.Terminated != nil && (container.State.Terminated.Reason == "Completed" || container.State.Terminated.Reason == "Error")) || - (container.State.Waiting != nil && container.State.Waiting.Reason == "ErrImagePull") { - toDelete = true + var toDelete, found bool + for _, w := range workers { + if workerName, ok := labels[LABEL_WORKER_NAME]; ok && workerName == w.Name { + found = true + break + } + } + if !found { + toDelete = true + } + + if !toDelete { + for _, container := range pod.Status.ContainerStatuses { + terminated := (container.State.Terminated != nil && (container.State.Terminated.Reason == "Completed" || container.State.Terminated.Reason == "Error")) + errImagePull := (container.State.Waiting != nil && container.State.Waiting.Reason == "ErrImagePull") + if terminated || errImagePull { + toDelete = true + break + } } } diff --git a/engine/hatchery/kubernetes/kill_workers_test.go b/engine/hatchery/kubernetes/kill_workers_test.go index e559d79eeb..8888bff74e 100644 --- a/engine/hatchery/kubernetes/kill_workers_test.go +++ b/engine/hatchery/kubernetes/kill_workers_test.go @@ -9,6 +9,8 @@ import ( "gopkg.in/h2non/gock.v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/ovh/cds/sdk" ) func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) { @@ -19,8 +21,12 @@ func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) { Items: []v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ - Name: "w1", - Namespace: "kyubi", + Name: "worker-1", + Namespace: "cds-workers", + Labels: map[string]string{ + LABEL_HATCHERY_NAME: "my-hatchery", + LABEL_WORKER_NAME: "worker-1", + }, }, Status: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ @@ -36,15 +42,23 @@ func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) { }, { ObjectMeta: metav1.ObjectMeta{ - Name: "wrong", - Namespace: "kyubi", + Name: "worker-2", + Namespace: "cds-workers", + Labels: map[string]string{ + LABEL_HATCHERY_NAME: "my-hatchery", + LABEL_WORKER_NAME: "worker-2", + }, }, Spec: v1.PodSpec{}, }, { ObjectMeta: metav1.ObjectMeta{ - Name: "w2", - Namespace: "kyubi", + Name: "worker-3", + Namespace: "cds-workers", + Labels: map[string]string{ + LABEL_HATCHERY_NAME: "my-hatchery", + LABEL_WORKER_NAME: "worker-3", + }, }, Status: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ @@ -60,8 +74,12 @@ func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) { }, { ObjectMeta: metav1.ObjectMeta{ - Name: "w3", - Namespace: "kyubi", + Name: "worker-4", + Namespace: "cds-workers", + Labels: map[string]string{ + LABEL_HATCHERY_NAME: "my-hatchery", + LABEL_WORKER_NAME: "worker-4", + }, }, Status: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ @@ -75,13 +93,28 @@ func TestHatcheryKubernetes_KillAwolWorkers(t *testing.T) { }, }, }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "worker-5", + Namespace: "cds-workers", + Labels: map[string]string{ + LABEL_HATCHERY_NAME: "my-hatchery", + LABEL_WORKER_NAME: "worker-5", + }, + }, + }, }, } - gock.New("http://lolcat.kube").Get("/api/v1/namespaces/hachibi/pods").Reply(http.StatusOK).JSON(podsList) + gock.New("http://lolcat.kube").Get("/api/v1/namespaces/cds-workers/pods").Reply(http.StatusOK).JSON(podsList) + + gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/cds-workers/pods/worker-1").Reply(http.StatusOK).JSON(nil) + gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/cds-workers/pods/worker-2").Reply(http.StatusOK).JSON(nil) + gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/cds-workers/pods/worker-3").Reply(http.StatusOK).JSON(nil) + gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/cds-workers/pods/worker-4").Reply(http.StatusOK).JSON(nil) - gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/kyubi/pods/w1").Reply(http.StatusOK).JSON(nil) - gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/kyubi/pods/w2").Reply(http.StatusOK).JSON(nil) - gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/kyubi/pods/w3").Reply(http.StatusOK).JSON(nil) + gock.New("http://lolcat.api").Get("/worker").Reply(http.StatusOK).JSON([]sdk.Worker{{ + Name: "worker-5", + }}) err := h.killAwolWorkers(context.TODO()) require.NoError(t, err) diff --git a/engine/hatchery/kubernetes/kubernetes.go b/engine/hatchery/kubernetes/kubernetes.go index 870110959d..f0fc5a3534 100644 --- a/engine/hatchery/kubernetes/kubernetes.go +++ b/engine/hatchery/kubernetes/kubernetes.go @@ -24,6 +24,7 @@ import ( "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/cdsclient" "github.com/ovh/cds/sdk/hatchery" + "github.com/ovh/cds/sdk/slug" ) // New instanciates a new hatchery local @@ -99,8 +100,12 @@ func (h *HatcheryKubernetes) ApplyConfiguration(cfg interface{}) error { // Status returns sdk.MonitoringStatus, implements interface service.Service func (h *HatcheryKubernetes) Status(ctx context.Context) *sdk.MonitoringStatus { m := h.NewMonitoringStatus() - m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(h.WorkersStarted(ctx)), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK}) - + ws, err := h.WorkersStarted(ctx) + if err != nil { + ctx = log.ContextWithStackTrace(ctx, err) + log.Warn(ctx, err.Error()) + } + m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(ws), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK}) return m } @@ -171,11 +176,6 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery return sdk.WithStack(fmt.Errorf("no job ID and no register")) } - label := "execution" - if spawnArgs.RegisterOnly { - label = "register" - } - var logJob string if spawnArgs.JobID > 0 { logJob = fmt.Sprintf("for workflow job %d,", spawnArgs.JobID) @@ -221,7 +221,6 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery envsWm := workerConfig.InjectEnvVars envsWm["CDS_MODEL_MEMORY"] = fmt.Sprintf("%d", memory) envsWm["CDS_FROM_WORKER_IMAGE"] = "true" - envsWm["CDS_CONFIG"] = workerConfig.EncodeBase64() for envName, envValue := range spawnArgs.Model.ModelDocker.Envs { envsWm[envName] = envValue @@ -239,6 +238,23 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery pullPolicy = "Always" } + // Create secret for worker config + configSecretName, err := h.createConfigSecret(ctx, workerConfig) + if err != nil { + return sdk.WrapError(err, "cannot create secret for config %s", workerConfig.Name) + } + envs = append(envs, apiv1.EnvVar{ + Name: "CDS_CONFIG", + ValueFrom: &apiv1.EnvVarSource{ + SecretKeyRef: &apiv1.SecretKeySelector{ + LocalObjectReference: apiv1.LocalObjectReference{ + Name: configSecretName, + }, + Key: "CDS_CONFIG", + }, + }, + }) + var gracePeriodSecs int64 podSchema := apiv1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -246,9 +262,9 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery Namespace: h.Config.Namespace, DeletionGracePeriodSeconds: &gracePeriodSecs, Labels: map[string]string{ - LABEL_WORKER: label, - LABEL_WORKER_MODEL: strings.ToLower(spawnArgs.Model.Name), - LABEL_HATCHERY_NAME: h.Configuration().Name, + LABEL_HATCHERY_NAME: h.Configuration().Name, + LABEL_WORKER_NAME: workerConfig.Name, + LABEL_WORKER_MODEL_PATH: slug.Convert(spawnArgs.Model.Path()), }, Annotations: map[string]string{}, }, @@ -273,6 +289,15 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery }, } + // Check here to add secret if needed + if spawnArgs.Model.ModelDocker.Private { + secretRegistryName, err := h.createRegistrySecret(ctx, *spawnArgs.Model) + if err != nil { + return sdk.WrapError(err, "cannot create secret for model %s", spawnArgs.Model.Path()) + } + podSchema.Spec.ImagePullSecrets = []apiv1.LocalObjectReference{{Name: secretRegistryName}} + } + var services []sdk.Requirement for _, req := range spawnArgs.Requirements { if req.Type == sdk.ServiceRequirement { @@ -286,16 +311,6 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery podSchema.Spec.HostAliases[0].Hostnames[0] = "worker" } - // Check here to add secret if needed - secretName := "cds-credreg-" + spawnArgs.Model.Name - if spawnArgs.Model.ModelDocker.Private { - if err := h.createSecret(ctx, secretName, *spawnArgs.Model); err != nil { - return sdk.WrapError(err, "cannot create secret for model %s", spawnArgs.Model.Path()) - } - podSchema.Spec.ImagePullSecrets = []apiv1.LocalObjectReference{{Name: secretName}} - podSchema.ObjectMeta.Labels[LABEL_SECRET] = secretName - } - for i, serv := range services { //name= => the name of the host put in /etc/hosts of the worker //value= "postgres:latest env_1=blabla env_2=blabla"" => we can add env variables in requirement name @@ -345,7 +360,7 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery podSchema.Spec.HostAliases[0].Hostnames[i+1] = strings.ToLower(serv.Name) } - _, err := h.kubeClient.PodCreate(ctx, h.Config.Namespace, &podSchema, metav1.CreateOptions{}) + _, err = h.kubeClient.PodCreate(ctx, h.Config.Namespace, &podSchema, metav1.CreateOptions{}) log.Debug(ctx, "hatchery> kubernetes> SpawnWorker> %s > Pod created", spawnArgs.WorkerName) return sdk.WithStack(err) } @@ -356,20 +371,18 @@ func (h *HatcheryKubernetes) GetLogger() *logrus.Logger { // WorkersStarted returns the number of instances started but // not necessarily register on CDS yet -func (h *HatcheryKubernetes) WorkersStarted(ctx context.Context) []string { - list, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{LabelSelector: LABEL_HATCHERY_NAME}) +func (h *HatcheryKubernetes) WorkersStarted(ctx context.Context) ([]string, error) { + list, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s,%s", LABEL_HATCHERY_NAME, h.Config.Name, LABEL_WORKER_NAME), + }) if err != nil { - log.Warn(ctx, "WorkersStarted> unable to list pods on namespace %s", h.Config.Namespace) - return nil + return nil, sdk.WrapError(err, "unable to list pods on namespace %s", h.Config.Namespace) } workerNames := make([]string, 0, list.Size()) for _, pod := range list.Items { - labels := pod.GetLabels() - if labels[LABEL_HATCHERY_NAME] == h.Configuration().Name { - workerNames = append(workerNames, pod.GetName()) - } + workerNames = append(workerNames, pod.GetName()) } - return workerNames + return workerNames, nil } // NeedRegistration return true if worker model need regsitration @@ -381,7 +394,7 @@ func (h *HatcheryKubernetes) NeedRegistration(_ context.Context, m *sdk.Model) b } func (h *HatcheryKubernetes) routines(ctx context.Context) { - ticker := time.NewTicker(10 * time.Minute) + ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { @@ -389,23 +402,25 @@ func (h *HatcheryKubernetes) routines(ctx context.Context) { case <-ticker.C: h.GoRoutines.Exec(ctx, "getCDNConfiguration", func(ctx context.Context) { if err := h.Common.RefreshServiceLogger(ctx); err != nil { - log.Error(ctx, "hatchery> kubernetes> cannot get cdn configuration : %v", err) + log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot get cdn configuration")) } }) h.GoRoutines.Exec(ctx, "getServicesLogs", func(ctx context.Context) { if err := h.getServicesLogs(ctx); err != nil { - log.Error(ctx, "Hatchery> Kubernetes> Cannot get service logs : %v", err) + log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot get service logs")) } }) h.GoRoutines.Exec(ctx, "killAwolWorker", func(ctx context.Context) { - _ = h.killAwolWorkers(ctx) + if err := h.killAwolWorkers(ctx); err != nil { + log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot delete awol worker")) + } }) h.GoRoutines.Exec(ctx, "deleteSecrets", func(ctx context.Context) { if err := h.deleteSecrets(ctx); err != nil { - log.Error(ctx, "hatchery> kubernetes> cannot handle secrets : %v", err) + log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot delete secrets")) } }) case <-ctx.Done(): diff --git a/engine/hatchery/kubernetes/kubernetes_test.go b/engine/hatchery/kubernetes/kubernetes_test.go index ebb2c2dda4..9b879e6105 100644 --- a/engine/hatchery/kubernetes/kubernetes_test.go +++ b/engine/hatchery/kubernetes/kubernetes_test.go @@ -5,9 +5,9 @@ import ( "encoding/json" "io" "net/http" + "strings" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/h2non/gock.v1" v1 "k8s.io/api/core/v1" @@ -27,16 +27,7 @@ func TestHatcheryKubernetes_WorkersStarted(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "w1", Labels: map[string]string{ - LABEL_HATCHERY_NAME: "kyubi", - }, - }, - Spec: v1.PodSpec{}, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "wrong", - Labels: map[string]string{ - LABEL_HATCHERY_NAME: "jubi", + LABEL_HATCHERY_NAME: "my-hatchery", }, }, Spec: v1.PodSpec{}, @@ -45,16 +36,17 @@ func TestHatcheryKubernetes_WorkersStarted(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "w2", Labels: map[string]string{ - LABEL_HATCHERY_NAME: "kyubi", + LABEL_HATCHERY_NAME: "my-hatchery", }, }, Spec: v1.PodSpec{}, }, }, } - gock.New("http://lolcat.kube").Get("/api/v1/namespaces/hachibi/pods").Reply(http.StatusOK).JSON(podsList) + gock.New("http://lolcat.kube").Get("/api/v1/namespaces/cds-workers/pods").Reply(http.StatusOK).JSON(podsList) - ws := h.WorkersStarted(context.TODO()) + ws, err := h.WorkersStarted(context.TODO()) + require.NoError(t, err) require.Equal(t, 2, len(ws)) require.Equal(t, "w1", ws[0]) require.Equal(t, "w2", ws[1]) @@ -65,7 +57,7 @@ func TestHatcheryKubernetes_Status(t *testing.T) { defer gock.Off() defer gock.Observe(nil) h := NewHatcheryKubernetesTest(t) - h.Config.HatcheryCommonConfiguration.Provision.InjectEnvVars = []string{"ZZZZ=ZZZZ"} + h.Config.HatcheryCommonConfiguration.Provision.InjectEnvVars = []string{"PROVISION_ENV=MYVALUE"} m := &sdk.Model{ Name: "model1", @@ -74,49 +66,64 @@ func TestHatcheryKubernetes_Status(t *testing.T) { }, } - podResponse := v1.Pod{} - gock.New("http://lolcat.kube").Post("/api/v1/namespaces/hachibi/pods").Reply(http.StatusOK).JSON(podResponse) + gock.New("http://lolcat.kube").Delete("/api/v1/namespaces/cds-workers/secrets/cds-worker-config-my-worker").Reply(http.StatusOK) + gock.New("http://lolcat.kube").Post("/api/v1/namespaces/cds-workers/secrets").Reply(http.StatusOK).JSON(v1.Pod{}) + gock.New("http://lolcat.kube").Post("/api/v1/namespaces/cds-workers/pods").Reply(http.StatusOK).JSON(v1.Pod{}) - var checkRequest gock.ObserverFunc = func(request *http.Request, mock gock.Mock) { - if request.Body == nil { - return - } + gock.Observe(func(request *http.Request, mock gock.Mock) { + t.Logf("%s %s", request.URL, request.Method) bodyContent, err := io.ReadAll(request.Body) - assert.NoError(t, err) - var podRequest v1.Pod - require.NoError(t, json.Unmarshal(bodyContent, &podRequest)) - + require.NoError(t, err) t.Logf("%s", string(bodyContent)) - require.Equal(t, "hachibi", podRequest.ObjectMeta.Namespace) - require.Equal(t, "kyubi", podRequest.Labels["CDS_HATCHERY_NAME"]) - require.Equal(t, "666", podRequest.Labels[hatchery.LabelServiceJobID]) - require.Equal(t, "999", podRequest.Labels[hatchery.LabelServiceNodeRunID]) - require.Equal(t, "execution", podRequest.Labels["CDS_WORKER"]) - require.Equal(t, "model1", podRequest.Labels["CDS_WORKER_MODEL"]) - - require.Equal(t, 2, len(podRequest.Spec.Containers)) - require.Equal(t, "k8s-toto", podRequest.Spec.Containers[0].Name) - require.Equal(t, int64(4096), podRequest.Spec.Containers[0].Resources.Requests.Memory().Value()) - var zzzzFound bool - for _, env := range podRequest.Spec.Containers[0].Env { - if "ZZZZ" == env.Name && "ZZZZ" == env.Value { - zzzzFound = true - break + + if request.Method == http.MethodPost && strings.HasPrefix(request.URL.String(), "http://lolcat.kube/api/v1/namespaces/cds-workers/secrets") { + var secretRequest v1.Secret + require.NoError(t, json.Unmarshal(bodyContent, &secretRequest)) + + require.Equal(t, "Secret", secretRequest.Kind) + require.Equal(t, "cds-worker-config-my-worker", secretRequest.Name) + require.Equal(t, "my-hatchery", secretRequest.Labels[LABEL_HATCHERY_NAME]) + require.Equal(t, "my-worker", secretRequest.Labels[LABEL_WORKER_NAME]) + } + + if request.Method == http.MethodPost && strings.HasPrefix(request.URL.String(), "http://lolcat.kube/api/v1/namespaces/cds-workers/pods") { + var podRequest v1.Pod + require.NoError(t, json.Unmarshal(bodyContent, &podRequest)) + + require.Equal(t, "Pod", podRequest.Kind) + require.Equal(t, "cds-workers", podRequest.ObjectMeta.Namespace) + require.Equal(t, "my-hatchery", podRequest.Labels[LABEL_HATCHERY_NAME]) + require.Equal(t, "666", podRequest.Labels[hatchery.LabelServiceJobID]) + require.Equal(t, "999", podRequest.Labels[hatchery.LabelServiceNodeRunID]) + require.Equal(t, "my-worker", podRequest.Labels[LABEL_WORKER_NAME]) + require.Equal(t, "group-model1", podRequest.Labels[LABEL_WORKER_MODEL_PATH]) + + require.Equal(t, 2, len(podRequest.Spec.Containers)) + require.Equal(t, "my-worker", podRequest.Spec.Containers[0].Name) + require.Equal(t, int64(4096), podRequest.Spec.Containers[0].Resources.Requests.Memory().Value()) + var foundEnv, foundSecret bool + for _, env := range podRequest.Spec.Containers[0].Env { + if env.Name == "PROVISION_ENV" && env.Value == "MYVALUE" { + foundEnv = true + } + if env.Name == "CDS_CONFIG" && env.ValueFrom != nil && env.ValueFrom.SecretKeyRef != nil && env.ValueFrom.SecretKeyRef.Name == "cds-worker-config-my-worker" { + foundSecret = true + } } + require.True(t, foundEnv, "\"PROVISION_ENV\" not found in env variables") + require.True(t, foundSecret, "\"CDS_CONFIG\" not found in env variables") + require.Equal(t, "service-0-pg", podRequest.Spec.Containers[1].Name) + require.Equal(t, 1, len(podRequest.Spec.Containers[1].Env)) + require.Equal(t, "PG_USERNAME", podRequest.Spec.Containers[1].Env[0].Name) + require.Equal(t, "username", podRequest.Spec.Containers[1].Env[0].Value) } - require.True(t, zzzzFound, "\"ZZZZ\" not found in env variables") - require.Equal(t, "service-0-pg", podRequest.Spec.Containers[1].Name) - require.Equal(t, 1, len(podRequest.Spec.Containers[1].Env)) - require.Equal(t, "PG_USERNAME", podRequest.Spec.Containers[1].Env[0].Name) - require.Equal(t, "toto", podRequest.Spec.Containers[1].Env[0].Value) - } - gock.Observe(checkRequest) + }) err := h.SpawnWorker(context.TODO(), hatchery.SpawnArguments{ JobID: 666, NodeRunID: 999, Model: m, - WorkerName: "k8s-toto", + WorkerName: "my-worker", Requirements: []sdk.Requirement{ { Name: "mem", @@ -125,7 +132,7 @@ func TestHatcheryKubernetes_Status(t *testing.T) { }, { Name: "pg", Type: sdk.ServiceRequirement, - Value: "postgresql:5.6.7 PG_USERNAME=toto", + Value: "postgresql:5.6.7 PG_USERNAME=username", }, }, }) diff --git a/engine/hatchery/kubernetes/secrets.go b/engine/hatchery/kubernetes/secrets.go index 3e0c3bd40f..22bae0715a 100644 --- a/engine/hatchery/kubernetes/secrets.go +++ b/engine/hatchery/kubernetes/secrets.go @@ -8,32 +8,49 @@ import ( apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/ovh/cds/engine/worker/pkg/workerruntime" "github.com/ovh/cds/sdk" + "github.com/ovh/cds/sdk/slug" ) +// Delete worker model registry and worker config secrets that are not used by any pods. func (h *HatcheryKubernetes) deleteSecrets(ctx context.Context) error { - pods, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{LabelSelector: LABEL_SECRET}) + pods, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s,%s", LABEL_HATCHERY_NAME, h.Config.Name, LABEL_WORKER_NAME), + }) if err != nil { return sdk.WrapError(err, "cannot get pods with secret") } - secrets, err := h.kubeClient.SecretList(ctx, h.Config.Namespace, metav1.ListOptions{LabelSelector: LABEL_SECRET}) + secrets, err := h.kubeClient.SecretList(ctx, h.Config.Namespace, metav1.ListOptions{LabelSelector: LABEL_HATCHERY_NAME}) if err != nil { return sdk.WrapError(err, "cannot get secrets") } for _, secret := range secrets.Items { - found := false + secretLabels := secret.GetLabels() + if secretLabels == nil { + continue + } + var found bool for _, pod := range pods.Items { - labels := pod.GetLabels() - if labels != nil && labels[LABEL_SECRET] == secret.Name { + podLabels := pod.GetLabels() + if podLabels == nil { + continue + } + if wm, ok := secretLabels[LABEL_WORKER_MODEL_PATH]; ok && podLabels[LABEL_WORKER_MODEL_PATH] == wm { + found = true + break + } + if w, ok := secretLabels[LABEL_WORKER_NAME]; ok && podLabels[LABEL_WORKER_NAME] == w { found = true break } } if !found { + log.Debug(ctx, "delete secret %q", secret.Name) if err := h.kubeClient.SecretDelete(ctx, h.Config.Namespace, secret.Name, metav1.DeleteOptions{}); err != nil { - log.Error(ctx, "deleteSecrets> Cannot delete secret %s : %v", secret.Name, err) + log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "cannot delete secret %s", secret.Name)) } } } @@ -41,30 +58,58 @@ func (h *HatcheryKubernetes) deleteSecrets(ctx context.Context) error { return nil } -func (h *HatcheryKubernetes) createSecret(ctx context.Context, secretName string, model sdk.Model) error { - if _, err := h.kubeClient.SecretGet(ctx, h.Config.Namespace, secretName, metav1.GetOptions{}); err != nil { - registry := "https://index.docker.io/v1/" - if model.ModelDocker.Registry != "" { - registry = model.ModelDocker.Registry - } - dockerCfg := fmt.Sprintf(`{"auths":{"%s":{"username":"%s","password":"%s"}}}`, registry, model.ModelDocker.Username, model.ModelDocker.Password) - wmSecret := apiv1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: secretName, - Namespace: h.Config.Namespace, - Labels: map[string]string{ - LABEL_SECRET: model.Name, - }, +func (h *HatcheryKubernetes) createRegistrySecret(ctx context.Context, model sdk.Model) (string, error) { + secretName := slug.Convert("cds-worker-registry-" + model.Path()) + + _ = h.kubeClient.SecretDelete(ctx, h.Config.Namespace, secretName, metav1.DeleteOptions{}) + + registry := "https://index.docker.io/v1/" + if model.ModelDocker.Registry != "" { + registry = model.ModelDocker.Registry + } + dockerCfg := fmt.Sprintf(`{"auths":{"%s":{"username":"%s","password":"%s"}}}`, registry, model.ModelDocker.Username, model.ModelDocker.Password) + + if _, err := h.kubeClient.SecretCreate(ctx, h.Config.Namespace, &apiv1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: h.Config.Namespace, + Labels: map[string]string{ + LABEL_HATCHERY_NAME: h.Configuration().Name, + LABEL_WORKER_MODEL_PATH: slug.Convert(model.Path()), }, - Type: apiv1.SecretTypeDockerConfigJson, - StringData: map[string]string{ - apiv1.DockerConfigJsonKey: dockerCfg, + }, + Type: apiv1.SecretTypeDockerConfigJson, + StringData: map[string]string{ + apiv1.DockerConfigJsonKey: dockerCfg, + }, + }, metav1.CreateOptions{}); err != nil { + return "", sdk.WrapError(err, "cannot create secret %s", secretName) + } + + return secretName, nil +} + +func (h *HatcheryKubernetes) createConfigSecret(ctx context.Context, workerConfig workerruntime.WorkerConfig) (string, error) { + secretName := slug.Convert("cds-worker-config-" + workerConfig.Name) + + _ = h.kubeClient.SecretDelete(ctx, h.Config.Namespace, secretName, metav1.DeleteOptions{}) + + if _, err := h.kubeClient.SecretCreate(ctx, h.Config.Namespace, &apiv1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: h.Config.Namespace, + Labels: map[string]string{ + LABEL_HATCHERY_NAME: h.Configuration().Name, + LABEL_WORKER_NAME: workerConfig.Name, }, - } - if _, err := h.kubeClient.SecretCreate(ctx, h.Config.Namespace, &wmSecret, metav1.CreateOptions{}); err != nil { - return sdk.WrapError(err, "Cannot create secret %s", secretName) - } + }, + Type: apiv1.SecretTypeOpaque, + StringData: map[string]string{ + "CDS_CONFIG": workerConfig.EncodeBase64(), + }, + }, metav1.CreateOptions{}); err != nil { + return "", sdk.WrapError(err, "cannot create secret %s", secretName) } - return nil + return secretName, nil } diff --git a/engine/hatchery/kubernetes/services.go b/engine/hatchery/kubernetes/services.go index b87465c5e3..fb27adf6c6 100644 --- a/engine/hatchery/kubernetes/services.go +++ b/engine/hatchery/kubernetes/services.go @@ -2,6 +2,7 @@ package kubernetes import ( "context" + "fmt" "strconv" "strings" "time" @@ -31,7 +32,9 @@ func (h *HatcheryKubernetes) getServicesLogs(ctx context.Context) error { apiWorkerNames[apiWorkers[i].Name] = struct{}{} } - pods, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{LabelSelector: hatchery.LabelServiceJobID}) + pods, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s,%s", LABEL_HATCHERY_NAME, h.Config.Name, hatchery.LabelServiceJobID), + }) if err != nil { return err } diff --git a/engine/hatchery/kubernetes/services_test.go b/engine/hatchery/kubernetes/services_test.go index 981272f6b2..8a751de402 100644 --- a/engine/hatchery/kubernetes/services_test.go +++ b/engine/hatchery/kubernetes/services_test.go @@ -41,7 +41,7 @@ func Test_serviceLogs(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "pod-name", - Namespace: "kyubi", + Namespace: "cds-workers", Labels: map[string]string{ hatchery.LabelServiceJobID: "666", hatchery.LabelServiceNodeRunID: "999", @@ -65,12 +65,12 @@ func Test_serviceLogs(t *testing.T) { }, }, } - gock.New("http://lolcat.kube").Get("/api/v1/namespaces/hachibi/pods").Reply(http.StatusOK).JSON(podsList) + gock.New("http://lolcat.kube").Get("/api/v1/namespaces/cds-workers/pods").Reply(http.StatusOK).JSON(podsList) gock.New("http://lolcat.kube").AddMatcher(func(r *http.Request, rr *gock.Request) (bool, error) { b, err := gock.MatchPath(r, rr) - assert.NoError(t, err) - if r.Method == http.MethodGet && strings.HasPrefix(r.URL.String(), "http://lolcat.kube/api/v1/namespaces/hachibi/pods/pod-name/log?container=service-666-blabla") { + require.NoError(t, err) + if r.Method == http.MethodGet && strings.HasPrefix(r.URL.String(), "http://lolcat.kube/api/v1/namespaces/cds-workers/pods/pod-name/log?container=service-666-blabla") { if b { return true, nil } diff --git a/engine/hatchery/kubernetes/types.go b/engine/hatchery/kubernetes/types.go index 9bc1cff170..5c08c47688 100644 --- a/engine/hatchery/kubernetes/types.go +++ b/engine/hatchery/kubernetes/types.go @@ -9,10 +9,9 @@ import ( ) const ( - LABEL_HATCHERY_NAME = "CDS_HATCHERY_NAME" - LABEL_WORKER = "CDS_WORKER" - LABEL_SECRET = "CDS_SECRET" - LABEL_WORKER_MODEL = "CDS_WORKER_MODEL" + LABEL_HATCHERY_NAME = "CDS_HATCHERY_NAME" + LABEL_WORKER_NAME = "CDS_WORKER_NAME" + LABEL_WORKER_MODEL_PATH = "CDS_WORKER_MODEL_PATH" ) var containerServiceNameRegexp = regexp.MustCompile(`service-([0-9]+)-(.*)`) diff --git a/engine/hatchery/local/local.go b/engine/hatchery/local/local.go index 70d996ef82..77c225a9bf 100644 --- a/engine/hatchery/local/local.go +++ b/engine/hatchery/local/local.go @@ -83,12 +83,16 @@ func (h *HatcheryLocal) ApplyConfiguration(cfg interface{}) error { // Status returns sdk.MonitoringStatus, implements interface service.Service func (h *HatcheryLocal) Status(ctx context.Context) *sdk.MonitoringStatus { m := h.NewMonitoringStatus() + ws, err := h.WorkersStarted(ctx) + if err != nil { + ctx = log.ContextWithStackTrace(ctx, err) + log.Warn(ctx, err.Error()) + } m.AddLine(sdk.MonitoringStatusLine{ Component: "Workers", - Value: fmt.Sprintf("%d/%d", len(h.WorkersStarted(ctx)), h.Config.Provision.MaxWorker), + Value: fmt.Sprintf("%d/%d", len(ws), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK, }) - return m } @@ -237,7 +241,7 @@ func (h *HatcheryLocal) killWorker(ctx context.Context, name string, workerCmd w // WorkersStarted returns the number of instances started but // not necessarily register on CDS yet -func (h *HatcheryLocal) WorkersStarted(ctx context.Context) []string { +func (h *HatcheryLocal) WorkersStarted(ctx context.Context) ([]string, error) { h.Mutex.Lock() defer h.Mutex.Unlock() workers := make([]string, len(h.workers)) @@ -246,7 +250,7 @@ func (h *HatcheryLocal) WorkersStarted(ctx context.Context) []string { workers[i] = n i++ } - return workers + return workers, nil } // InitHatchery register local hatchery with its worker model diff --git a/engine/hatchery/marathon/marathon.go b/engine/hatchery/marathon/marathon.go index 7b571d6b2d..e901c9bbda 100644 --- a/engine/hatchery/marathon/marathon.go +++ b/engine/hatchery/marathon/marathon.go @@ -87,7 +87,12 @@ func (h *HatcheryMarathon) ApplyConfiguration(cfg interface{}) error { // Status returns sdk.MonitoringStatus, implements interface service.Service func (h *HatcheryMarathon) Status(ctx context.Context) *sdk.MonitoringStatus { m := h.NewMonitoringStatus() - m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(h.WorkersStarted(ctx)), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK}) + ws, err := h.WorkersStarted(ctx) + if err != nil { + ctx = log.ContextWithStackTrace(ctx, err) + log.Warn(ctx, err.Error()) + } + m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(ws), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK}) return m } @@ -419,18 +424,17 @@ func (h *HatcheryMarathon) listApplications(idPrefix string) ([]string, error) { // WorkersStarted returns the number of instances started but // not necessarily register on CDS yet -func (h *HatcheryMarathon) WorkersStarted(ctx context.Context) []string { +func (h *HatcheryMarathon) WorkersStarted(ctx context.Context) ([]string, error) { apps, err := h.listApplications(h.Config.MarathonIDPrefix) if err != nil { - log.Warn(ctx, "WorkersStarted> error on list applications err:%s", err) - return nil + return nil, sdk.WrapError(err, "error on list applications") } res := make([]string, len(apps)) for i, s := range apps { res[i] = strings.Replace(s, h.Config.MarathonIDPrefix, "", 1) res[i] = strings.TrimPrefix(res[i], "/") } - return res + return res, nil } // InitHatchery only starts killing routine of worker not registered diff --git a/engine/hatchery/marathon/marathon_test.go b/engine/hatchery/marathon/marathon_test.go index e7002477d3..05c58bfcfa 100644 --- a/engine/hatchery/marathon/marathon_test.go +++ b/engine/hatchery/marathon/marathon_test.go @@ -14,6 +14,7 @@ import ( "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/hatchery" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "gopkg.in/h2non/gock.v1" ) @@ -37,7 +38,8 @@ func TestWorkerStarted(t *testing.T) { } gock.New("http://mara.thon").Get("/v2/apps").Reply(200).JSON(apps) - wkrs := h.WorkersStarted(context.TODO()) + wkrs, err := h.WorkersStarted(context.TODO()) + require.NoError(t, err) t.Logf("%+v", wkrs) assert.Equal(t, 2, len(wkrs)) assert.Equal(t, "w1", wkrs[0]) @@ -179,7 +181,6 @@ func TestKillAwolWOrkers(t *testing.T) { err := h.killAwolWorkers() assert.NoError(t, err) assert.True(t, gock.IsDone()) - } func TestSpawn(t *testing.T) { diff --git a/engine/hatchery/openstack/openstack.go b/engine/hatchery/openstack/openstack.go index 72b2f335e3..2d9acbc737 100644 --- a/engine/hatchery/openstack/openstack.go +++ b/engine/hatchery/openstack/openstack.go @@ -91,7 +91,12 @@ func (h *HatcheryOpenstack) ApplyConfiguration(cfg interface{}) error { // Status returns sdk.MonitoringStatus, implements interface service.Service func (h *HatcheryOpenstack) Status(ctx context.Context) *sdk.MonitoringStatus { m := h.NewMonitoringStatus() - m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(h.WorkersStarted(ctx)), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK}) + ws, err := h.WorkersStarted(ctx) + if err != nil { + ctx = log.ContextWithStackTrace(ctx, err) + log.Warn(ctx, err.Error()) + } + m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(ws), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK}) return m } @@ -487,13 +492,13 @@ func (h *HatcheryOpenstack) deleteServer(ctx context.Context, s servers.Server) // WorkersStarted returns the number of instances started but // not necessarily register on CDS yet -func (h *HatcheryOpenstack) WorkersStarted(ctx context.Context) []string { +func (h *HatcheryOpenstack) WorkersStarted(ctx context.Context) ([]string, error) { srvs := h.getServers(ctx) res := make([]string, len(srvs)) for i, s := range srvs { res[i] = s.Metadata["worker"] } - return res + return res, nil } // NeedRegistration return true if worker model need regsitration diff --git a/engine/hatchery/swarm/swarm.go b/engine/hatchery/swarm/swarm.go index bd898bcad1..85f800f810 100644 --- a/engine/hatchery/swarm/swarm.go +++ b/engine/hatchery/swarm/swarm.go @@ -489,23 +489,21 @@ func (h *HatcherySwarm) CanSpawn(ctx context.Context, model *sdk.Model, jobID in // WorkersStarted returns the number of instances started but // not necessarily register on CDS yet -func (h *HatcherySwarm) WorkersStarted(ctx context.Context) []string { +func (h *HatcherySwarm) WorkersStarted(ctx context.Context) ([]string, error) { ctx, end := telemetry.Span(ctx, "hatchery.WorkersStarted") defer end() - res := make([]string, 0) for _, dockerClient := range h.dockerClients { containers, err := h.getContainers(ctx, dockerClient, types.ContainerListOptions{All: true}) if err != nil { - log.Error(ctx, "hatchery> swarm> WorkersStarted> Unable to list containers: %s", err) - return nil + return nil, sdk.WrapError(err, "unable to list containers") } workers := containers.FilterWorkers() for _, w := range workers { res = append(res, w.Labels[LabelWorkerName]) } } - return res + return res, nil } func (h *HatcherySwarm) GetLogger() *logrus.Logger { diff --git a/engine/hatchery/swarm/swarm_conf.go b/engine/hatchery/swarm/swarm_conf.go index 4146dcb837..1774224ccd 100644 --- a/engine/hatchery/swarm/swarm_conf.go +++ b/engine/hatchery/swarm/swarm_conf.go @@ -61,7 +61,12 @@ func (h *HatcherySwarm) ApplyConfiguration(cfg interface{}) error { // Status returns sdk.MonitoringStatus, implements interface service.Service func (h *HatcherySwarm) Status(ctx context.Context) *sdk.MonitoringStatus { m := h.NewMonitoringStatus() - m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(h.WorkersStarted(ctx)), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK}) + ws, err := h.WorkersStarted(ctx) + if err != nil { + ctx = log.ContextWithStackTrace(ctx, err) + log.Warn(ctx, err.Error()) + } + m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(ws), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK}) var nbErrorImageList, nbErrorGetContainers int for dockerName, dockerClient := range h.dockerClients { //Check images diff --git a/engine/hatchery/swarm/swarm_test.go b/engine/hatchery/swarm/swarm_test.go index 35c023b316..ff19eed16e 100644 --- a/engine/hatchery/swarm/swarm_test.go +++ b/engine/hatchery/swarm/swarm_test.go @@ -260,7 +260,8 @@ func TestHatcherySwarm_WorkersStarted(t *testing.T) { } gock.New("https://lolcat.host").Get("/v6.66/containers/json").Reply(http.StatusOK).JSON(containers) - s := h.WorkersStarted(context.TODO()) + s, err := h.WorkersStarted(context.TODO()) + require.NoError(t, err) require.Equal(t, 2, len(s)) require.Equal(t, "w1", s[0]) require.Equal(t, "w2", s[1]) diff --git a/engine/hatchery/vsphere/client.go b/engine/hatchery/vsphere/client.go index d18a54978f..e7f6e8e79e 100644 --- a/engine/hatchery/vsphere/client.go +++ b/engine/hatchery/vsphere/client.go @@ -34,7 +34,6 @@ func (h *HatcheryVSphere) getVirtualMachines(ctx context.Context) []mo.VirtualMa if isNotTemplate { result = append(result, vms[i]) } - } return result } diff --git a/engine/hatchery/vsphere/hatchery.go b/engine/hatchery/vsphere/hatchery.go index 72cf9edfd2..ba9f1854ce 100644 --- a/engine/hatchery/vsphere/hatchery.go +++ b/engine/hatchery/vsphere/hatchery.go @@ -87,7 +87,12 @@ func (h *HatcheryVSphere) ApplyConfiguration(cfg interface{}) error { // Status returns sdk.MonitoringStatus, implements interface service.Service func (h *HatcheryVSphere) Status(ctx context.Context) *sdk.MonitoringStatus { m := h.NewMonitoringStatus() - m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(h.WorkersStarted(ctx)), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK}) + ws, err := h.WorkersStarted(ctx) + if err != nil { + ctx = log.ContextWithStackTrace(ctx, err) + log.Warn(ctx, err.Error()) + } + m.AddLine(sdk.MonitoringStatusLine{Component: "Workers", Value: fmt.Sprintf("%d/%d", len(ws), h.Config.Provision.MaxWorker), Status: sdk.MonitoringStatusOK}) return m } @@ -249,13 +254,13 @@ func (h *HatcheryVSphere) WorkerModelSecretList(m sdk.Model) (sdk.WorkerModelSec // WorkersStarted returns the list of workers started but // not necessarily register on CDS yet -func (h *HatcheryVSphere) WorkersStarted(ctx context.Context) []string { +func (h *HatcheryVSphere) WorkersStarted(ctx context.Context) ([]string, error) { srvs := h.getVirtualMachines(ctx) res := make([]string, 0, len(srvs)) for _, s := range srvs { res = append(res, s.Name) } - return res + return res, nil } // ModelType returns type of hatchery diff --git a/sdk/hatchery/hatchery_test.go b/sdk/hatchery/hatchery_test.go index f10c485aff..3fcdff97ca 100644 --- a/sdk/hatchery/hatchery_test.go +++ b/sdk/hatchery/hatchery_test.go @@ -80,7 +80,7 @@ func TestCreate(t *testing.T) { // This calls are expected for each job received in the channel mockCDSClient.EXPECT().WorkerList(gomock.Any()).Return(nil, nil).AnyTimes() - mockHatchery.EXPECT().WorkersStarted(gomock.Any()).Return(nil).AnyTimes() + mockHatchery.EXPECT().WorkersStarted(gomock.Any()).Return(nil, nil).AnyTimes() mockHatchery.EXPECT().CanSpawn(gomock.Any(), gomock.Any(), int64(666), gomock.Any()).Return(true).AnyTimes() mockCDSClient.EXPECT().QueueJobBook(gomock.Any(), int64(666)).Return(sdk.WorkflowNodeJobRunBooked{}, nil).AnyTimes() mockCDSClient.EXPECT().QueueJobSendSpawnInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() diff --git a/sdk/hatchery/mock_hatchery/interface_mock.go b/sdk/hatchery/mock_hatchery/interface_mock.go index 44f9b15b77..f5fab427d6 100644 --- a/sdk/hatchery/mock_hatchery/interface_mock.go +++ b/sdk/hatchery/mock_hatchery/interface_mock.go @@ -209,11 +209,12 @@ func (mr *MockInterfaceMockRecorder) Type() *gomock.Call { } // WorkersStarted mocks base method. -func (m *MockInterface) WorkersStarted(ctx context.Context) []string { +func (m *MockInterface) WorkersStarted(ctx context.Context) ([]string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WorkersStarted", ctx) ret0, _ := ret[0].([]string) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // WorkersStarted indicates an expected call of WorkersStarted. @@ -472,11 +473,12 @@ func (mr *MockInterfaceWithModelsMockRecorder) WorkerModelsEnabled() *gomock.Cal } // WorkersStarted mocks base method. -func (m *MockInterfaceWithModels) WorkersStarted(ctx context.Context) []string { +func (m *MockInterfaceWithModels) WorkersStarted(ctx context.Context) ([]string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WorkersStarted", ctx) ret0, _ := ret[0].([]string) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // WorkersStarted indicates an expected call of WorkersStarted. diff --git a/sdk/hatchery/pool.go b/sdk/hatchery/pool.go index 56cb737893..7062cd0084 100644 --- a/sdk/hatchery/pool.go +++ b/sdk/hatchery/pool.go @@ -32,7 +32,10 @@ func WorkerPool(ctx context.Context, h Interface, statusFilter ...string) ([]sdk } // Then: get all workers in the orchestrator queue - startedWorkers := h.WorkersStarted(ctx) + startedWorkers, err := h.WorkersStarted(ctx) + if err != nil { + return nil, err + } // Make the union of the two slices allWorkers := make([]sdk.Worker, 0, len(startedWorkers)+len(registeredWorkers)) diff --git a/sdk/hatchery/types.go b/sdk/hatchery/types.go index edc303c857..0631410bb5 100644 --- a/sdk/hatchery/types.go +++ b/sdk/hatchery/types.go @@ -106,7 +106,7 @@ type Interface interface { InitHatchery(ctx context.Context) error SpawnWorker(ctx context.Context, spawnArgs SpawnArguments) error CanSpawn(ctx context.Context, model *sdk.Model, jobID int64, requirements []sdk.Requirement) bool - WorkersStarted(ctx context.Context) []string + WorkersStarted(ctx context.Context) ([]string, error) Service() *sdk.Service CDSClient() cdsclient.Interface Configuration() service.HatcheryCommonConfiguration