Skip to content

Commit

Permalink
feat(hatchery/kubernetes): various improvement (#5679)
Browse files Browse the repository at this point in the history
* new interface to have a cleaner interface to k8s API
* don't get the namespace at startup (to reduce the expected roles in k8s RBAC)
* don't try to create the namespace (to reduce the expected roles in k8s RBAC)

Signed-off-by: francois  samin <[email protected]>
  • Loading branch information
fsamin authored Feb 12, 2021
1 parent eb557fb commit ca4786a
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 140 deletions.
4 changes: 2 additions & 2 deletions engine/hatchery/kubernetes/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ func NewHatcheryKubernetesTest(t *testing.T) *HatcheryKubernetes {
clientSet, errCl := kubernetes.NewForConfig(&rest.Config{Host: "http://lolcat.kube"})
require.NoError(t, errCl)

h.k8sClient = clientSet
gock.InterceptClient(h.k8sClient.CoreV1().RESTClient().(*rest.RESTClient).Client)
h.kubeClient = &kubernetesClient{clientSet}
gock.InterceptClient(clientSet.CoreV1().RESTClient().(*rest.RESTClient).Client)

h.Config.Name = "kyubi"
h.Config.Namespace = "hachibi"
Expand Down
7 changes: 3 additions & 4 deletions engine/hatchery/kubernetes/kill_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func (h *HatcheryKubernetes) killAwolWorkers(ctx context.Context) error {
pods, err := h.k8sClient.CoreV1().Pods(h.Config.Namespace).List(metav1.ListOptions{LabelSelector: LABEL_WORKER})
pods, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{LabelSelector: LABEL_WORKER})
if err != nil {
return err
}
Expand All @@ -35,7 +35,7 @@ func (h *HatcheryKubernetes) killAwolWorkers(ctx context.Context) error {
}

// If no job identifiers, no services on pod
jobIdentifiers := h.getJobIdentiers(labels)
jobIdentifiers := getJobIdentiers(labels)
if jobIdentifiers != nil {
// Browse container to send end log for each service
servicesLogs := make([]cdslog.Message, 0)
Expand Down Expand Up @@ -97,9 +97,8 @@ func (h *HatcheryKubernetes) killAwolWorkers(ctx context.Context) error {
log.Error(ctx, "killAndRemove> error on call client.WorkerModelSpawnError on worker model %s for register: %s", modelPath, err)
}
}

}
if err := h.k8sClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
if err := h.kubeClient.PodDelete(ctx, pod.Namespace, pod.Name, nil); err != nil {
globalErr = err
log.Error(ctx, "hatchery:kubernetes> killAwolWorkers> Cannot delete pod %s (%s)", pod.Name, err)
}
Expand Down
108 changes: 12 additions & 96 deletions engine/hatchery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"fmt"
"html/template"
"os"
"strconv"
"strings"
"time"
Expand All @@ -14,14 +13,11 @@ import (
"github.com/gorilla/mux"
"github.com/rockbears/log"
"github.com/sirupsen/logrus"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"

"github.com/ovh/cds/engine/api"
"github.com/ovh/cds/engine/service"
Expand Down Expand Up @@ -80,71 +76,16 @@ func (h *HatcheryKubernetes) ApplyConfiguration(cfg interface{}) error {
return fmt.Errorf("Invalid configuration")
}

var errCl error
var clientSet *kubernetes.Clientset
k8sTimeout := time.Second * 10

if h.Config.KubernetesConfigFile != "" {
cfg, err := clientcmd.BuildConfigFromFlags(h.Config.KubernetesMasterURL, h.Config.KubernetesConfigFile)
if err != nil {
return sdk.WrapError(err, "Cannot build config from flags")
}
cfg.Timeout = k8sTimeout

clientSet, errCl = kubernetes.NewForConfig(cfg)
if errCl != nil {
return sdk.WrapError(errCl, "Cannot create client with newForConfig")
}
} else if h.Config.KubernetesMasterURL != "" {
configK8s, err := clientcmd.BuildConfigFromKubeconfigGetter(h.Config.KubernetesMasterURL, h.getStartingConfig)
if err != nil {
return sdk.WrapError(err, "Cannot build config from config getter")
}
configK8s.Timeout = k8sTimeout

if h.Config.KubernetesCertAuthData != "" {
configK8s.TLSClientConfig = rest.TLSClientConfig{
CAData: []byte(h.Config.KubernetesCertAuthData),
CertData: []byte(h.Config.KubernetesClientCertData),
KeyData: []byte(h.Config.KubernetesClientKeyData),
}
}

// creates the clientset
clientSet, errCl = kubernetes.NewForConfig(configK8s)
if errCl != nil {
return sdk.WrapError(errCl, "Cannot create new config")
}
} else {
config, err := rest.InClusterConfig()
if err != nil {
return sdk.WrapError(err, "Unable to configure k8s InClusterConfig")
}

clientSet, errCl = kubernetes.NewForConfig(config)
if errCl != nil {
return sdk.WrapError(errCl, "Unable to configure k8s client with InClusterConfig")
}

}

h.k8sClient = clientSet

if h.Config.Namespace != apiv1.NamespaceDefault {
if _, err := clientSet.CoreV1().Namespaces().Get(h.Config.Namespace, metav1.GetOptions{}); err != nil {
ns := apiv1.Namespace{}
ns.SetName(h.Config.Namespace)
if _, errC := clientSet.CoreV1().Namespaces().Create(&ns); errC != nil {
return sdk.WrapError(errC, "Cannot create namespace %s in kubernetes", h.Config.Namespace)
}
}
var err error
h.kubeClient, err = initKubeClient(h.Config)
if err != nil {
return err
}

h.Common.Common.ServiceName = h.Config.Name
h.Common.Common.ServiceType = sdk.TypeHatchery
h.HTTPURL = h.Config.URL
h.MaxHeartbeatFailures = h.Config.API.MaxHeartbeatFailures
var err error
h.Common.Common.PrivateKey, err = jwt.ParseRSAPrivateKeyFromPEM([]byte(h.Config.RSAPrivateKey))
if err != nil {
return fmt.Errorf("unable to parse RSA private Key: %v", err)
Expand All @@ -161,42 +102,19 @@ func (h *HatcheryKubernetes) Status(ctx context.Context) *sdk.MonitoringStatus {
return m
}

// getStartingConfig implements ConfigAccess
func (h *HatcheryKubernetes) getStartingConfig() (*clientcmdapi.Config, error) {
defaultClientConfigRules := clientcmd.NewDefaultClientConfigLoadingRules()
overrideCfg := clientcmd.ConfigOverrides{
AuthInfo: clientcmdapi.AuthInfo{
Username: h.Config.KubernetesUsername,
Password: h.Config.KubernetesPassword,
Token: h.Config.KubernetesToken,
},
}

clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(defaultClientConfigRules, &overrideCfg)
rawConfig, err := clientConfig.RawConfig()
if os.IsNotExist(err) {
return clientcmdapi.NewConfig(), nil
}
if err != nil {
return nil, err
}

return &rawConfig, nil
}

// CheckConfiguration checks the validity of the configuration object
func (h *HatcheryKubernetes) CheckConfiguration(cfg interface{}) error {
hconfig, ok := cfg.(HatcheryConfiguration)
if !ok {
return fmt.Errorf("Invalid hatchery kubernetes configuration")
return sdk.WithStack(fmt.Errorf("invalid hatchery kubernetes configuration"))
}

if err := hconfig.Check(); err != nil {
return fmt.Errorf("Invalid hatchery kubernetes configuration: %v", err)
return sdk.WithStack(fmt.Errorf("invalid hatchery kubernetes configuration: %v", err))
}

if hconfig.Namespace == "" {
return fmt.Errorf("please enter a valid kubernetes namespace")
return sdk.WithStack(fmt.Errorf("missing valid kubernetes namespace"))
}

return nil
Expand Down Expand Up @@ -391,7 +309,7 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
// Check here to add secret if needed
secretName := "cds-credreg-" + spawnArgs.Model.Name
if spawnArgs.Model.ModelDocker.Private {
if err := h.createSecret(secretName, *spawnArgs.Model); err != nil {
if err := h.createSecret(ctx, secretName, *spawnArgs.Model); err != nil {
return sdk.WrapError(err, "cannot create secret for model %s", spawnArgs.Model.Path())
}
podSchema.Spec.ImagePullSecrets = []apiv1.LocalObjectReference{{Name: secretName}}
Expand Down Expand Up @@ -447,10 +365,8 @@ func (h *HatcheryKubernetes) SpawnWorker(ctx context.Context, spawnArgs hatchery
podSchema.Spec.HostAliases[0].Hostnames[i+1] = strings.ToLower(serv.Name)
}

_, err := h.k8sClient.CoreV1().Pods(h.Config.Namespace).Create(&podSchema)

_, err := h.kubeClient.PodCreate(ctx, h.Config.Namespace, &podSchema)
log.Debug(ctx, "hatchery> kubernetes> SpawnWorker> %s > Pod created", spawnArgs.WorkerName)

return err
}

Expand All @@ -461,7 +377,7 @@ func (h *HatcheryKubernetes) GetLogger() *logrus.Logger {
// WorkersStarted returns the number of instances started but
// not necessarily register on CDS yet
func (h *HatcheryKubernetes) WorkersStarted(ctx context.Context) []string {
list, err := h.k8sClient.CoreV1().Pods(h.Config.Namespace).List(metav1.ListOptions{LabelSelector: LABEL_HATCHERY_NAME})
list, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{LabelSelector: LABEL_HATCHERY_NAME})
if err != nil {
log.Warn(ctx, "WorkersStarted> unable to list pods on namespace %s", h.Config.Namespace)
return nil
Expand All @@ -479,7 +395,7 @@ func (h *HatcheryKubernetes) WorkersStarted(ctx context.Context) []string {
// WorkersStartedByModel returns the number of instances of given model started but
// not necessarily register on CDS yet
func (h *HatcheryKubernetes) WorkersStartedByModel(ctx context.Context, model *sdk.Model) int {
list, err := h.k8sClient.CoreV1().Pods(h.Config.Namespace).List(metav1.ListOptions{LabelSelector: LABEL_WORKER_MODEL})
list, err := h.kubeClient.PodList(ctx, h.Config.Namespace, metav1.ListOptions{LabelSelector: LABEL_WORKER_MODEL})
if err != nil {
log.Error(ctx, "WorkersStartedByModel> Cannot get list of workers started (%s)", err)
return 0
Expand Down
174 changes: 174 additions & 0 deletions engine/hatchery/kubernetes/kubernetes_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package kubernetes

import (
"context"
"os"
"time"

"github.com/ovh/cds/sdk"
"github.com/rockbears/log"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)

const (
logNS = log.Field("k8s_ns")
logPod = log.Field("k8s_pod")
)

func init() {
log.RegisterField(logNS, logPod)
}

func initKubeClient(config HatcheryConfiguration) (KubernetesClient, error) {
k8sTimeout := time.Second * 10

if config.KubernetesConfigFile != "" {
cfg, err := clientcmd.BuildConfigFromFlags(config.KubernetesMasterURL, config.KubernetesConfigFile)
if err != nil {
return nil, sdk.WrapError(err, "Cannot build config from flags")
}
cfg.Timeout = k8sTimeout

clientSet, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, sdk.WrapError(err, "Cannot create client with newForConfig")
}
return &kubernetesClient{clientSet}, nil
}

if config.KubernetesMasterURL != "" {
configK8s, err := clientcmd.BuildConfigFromKubeconfigGetter(config.KubernetesMasterURL, getStartingConfig(config))
if err != nil {
return nil, sdk.WrapError(err, "Cannot build config from config getter")
}
configK8s.Timeout = k8sTimeout

if config.KubernetesCertAuthData != "" {
configK8s.TLSClientConfig = rest.TLSClientConfig{
CAData: []byte(config.KubernetesCertAuthData),
CertData: []byte(config.KubernetesClientCertData),
KeyData: []byte(config.KubernetesClientKeyData),
}
}

// creates the clientset
clientSet, err := kubernetes.NewForConfig(configK8s)
if err != nil {
return nil, sdk.WrapError(err, "Cannot create new config")
}

return &kubernetesClient{clientSet}, nil
}

cfg, err := rest.InClusterConfig()
if err != nil {
return nil, sdk.WrapError(err, "Unable to configure k8s InClusterConfig")
}

clientSet, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, sdk.WrapError(err, "Unable to configure k8s client with InClusterConfig")
}

return &kubernetesClient{clientSet}, nil
}

// getStartingConfig implements ConfigAccess
func getStartingConfig(config HatcheryConfiguration) func() (*clientcmdapi.Config, error) {
return func() (*clientcmdapi.Config, error) {
defaultClientConfigRules := clientcmd.NewDefaultClientConfigLoadingRules()
overrideCfg := clientcmd.ConfigOverrides{
AuthInfo: clientcmdapi.AuthInfo{
Username: config.KubernetesUsername,
Password: config.KubernetesPassword,
Token: config.KubernetesToken,
},
}

clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(defaultClientConfigRules, &overrideCfg)
rawConfig, err := clientConfig.RawConfig()
if os.IsNotExist(err) {
return clientcmdapi.NewConfig(), nil
}
if err != nil {
return nil, err
}

return &rawConfig, nil
}
}

type KubernetesClient interface {
PodCreate(ctx context.Context, ns string, spec *corev1.Pod) (*corev1.Pod, error)
PodDelete(ctx context.Context, ns string, name string, options *metav1.DeleteOptions) error
PodGetRawLogs(ctx context.Context, ns string, name string, options *corev1.PodLogOptions) ([]byte, error)
PodList(ctx context.Context, ns string, options metav1.ListOptions) (*corev1.PodList, error)
SecretCreate(ctx context.Context, ns string, spec *corev1.Secret) (*corev1.Secret, error)
SecretDelete(ctx context.Context, ns string, name string, options *metav1.DeleteOptions) error
SecretGet(ctx context.Context, ns string, name string, options *metav1.GetOptions) (*corev1.Secret, error)
SecretList(ctx context.Context, ns string, options metav1.ListOptions) (*corev1.SecretList, error)
}

type kubernetesClient struct {
client *kubernetes.Clientset
}

var (
_ KubernetesClient = new(kubernetesClient)
)

func (k *kubernetesClient) PodCreate(ctx context.Context, ns string, spec *corev1.Pod) (*corev1.Pod, error) {
ctx = context.WithValue(ctx, logNS, ns)
ctx = context.WithValue(ctx, logPod, spec.Name)
log.Info(ctx, "creating pod %s", spec.Name)
pod, err := k.client.CoreV1().Pods(ns).Create(spec)
return pod, sdk.WrapError(err, "unable to create pod %s", spec.Name)
}

func (k *kubernetesClient) PodDelete(ctx context.Context, ns string, name string, options *metav1.DeleteOptions) error {
ctx = context.WithValue(ctx, logNS, ns)
ctx = context.WithValue(ctx, logPod, name)
log.Info(ctx, "deleting pod %s", name)
err := k.client.CoreV1().Pods(ns).Delete(name, options)
return sdk.WrapError(err, "unable to delete pod %s", name)
}

func (k *kubernetesClient) PodList(ctx context.Context, ns string, opts metav1.ListOptions) (*corev1.PodList, error) {
ctx = context.WithValue(ctx, logNS, ns)
log.Info(ctx, "listing pod")
pods, err := k.client.CoreV1().Pods(ns).List(opts)
return pods, sdk.WrapError(err, "unable to list pods in namespace %s", ns)
}

func (k *kubernetesClient) SecretCreate(ctx context.Context, ns string, spec *corev1.Secret) (*corev1.Secret, error) {
secret, err := k.client.CoreV1().Secrets(ns).Create(spec)
return secret, sdk.WrapError(err, "unable to create secret %s", spec.Name)
}

func (k *kubernetesClient) SecretDelete(ctx context.Context, ns string, name string, options *metav1.DeleteOptions) error {
err := k.client.CoreV1().Secrets(ns).Delete(name, options)
return sdk.WrapError(err, "unable to delete secret %s", name)
}

func (k *kubernetesClient) SecretGet(ctx context.Context, ns string, name string, options *metav1.GetOptions) (*corev1.Secret, error) {
secret, err := k.client.CoreV1().Secrets(ns).Get(name, *options)
return secret, sdk.WrapError(err, "unable to get secret %s", name)
}

func (k *kubernetesClient) SecretList(ctx context.Context, ns string, options metav1.ListOptions) (*corev1.SecretList, error) {
secrets, err := k.client.CoreV1().Secrets(ns).List(options)
return secrets, sdk.WrapError(err, "unable to list secrets in namespace %s", ns)
}

func (k *kubernetesClient) PodGetRawLogs(ctx context.Context, ns string, name string, options *corev1.PodLogOptions) ([]byte, error) {
ctx = context.WithValue(ctx, logNS, ns)
ctx = context.WithValue(ctx, logPod, name)
log.Debug(ctx, "get logs for pod %s", name)
logs, err := k.client.CoreV1().Pods(ns).GetLogs(name, options).DoRaw()
return logs, sdk.WrapError(err, "unable to get pod %s raw logs", name)
}
Loading

0 comments on commit ca4786a

Please sign in to comment.