Skip to content

Commit

Permalink
Write certificate to local file (flyteorg#421)
Browse files Browse the repository at this point in the history
* Make MetricsBindAddress configurable

Signed-off-by: Kevin Su <[email protected]>

* wip

Signed-off-by: Kevin Su <[email protected]>

* Expose init cert

Signed-off-by: Kevin Su <[email protected]>

* Expose init cert

Signed-off-by: Kevin Su <[email protected]>

* Address webhook issue

Signed-off-by: Kevin Su <[email protected]>

* Fixed tests

Signed-off-by: Kevin Su <[email protected]>

* Make controller manager externally callable

Signed-off-by: Kevin Su <[email protected]>

* Lint

Signed-off-by: Kevin Su <[email protected]>

* comment

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Apr 11, 2022
1 parent c4553bb commit ff9b4e8
Show file tree
Hide file tree
Showing 13 changed files with 402 additions and 297 deletions.
228 changes: 3 additions & 225 deletions flytepropeller/cmd/controller/cmd/init_certs.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,17 @@
package cmd

import (
"bytes"
"context"
cryptorand "crypto/rand"

"github.com/flyteorg/flytepropeller/pkg/webhook"

webhookConfig "github.com/flyteorg/flytepropeller/pkg/webhook/config"

"github.com/flyteorg/flytestdlib/logger"
kubeErrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/utils"

corev1 "k8s.io/api/core/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
webhookConfig "github.com/flyteorg/flytepropeller/pkg/webhook/config"

"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"math/big"
"os"
"time"
"github.com/flyteorg/flytepropeller/pkg/webhook"

"github.com/spf13/cobra"
)

const (
CaCertKey = "ca.crt"
ServerCertKey = "tls.crt"
ServerCertPrivateKey = "tls.key"
)

// initCertsCmd initializes x509 TLS Certificates and saves them to a secret.
var initCertsCmd = &cobra.Command{
Use: "init-certs",
Expand All @@ -58,207 +33,10 @@ for the Webhook command to mount and read correctly.
`,
Example: "flytepropeller webhook init-certs",
RunE: func(cmd *cobra.Command, args []string) error {
return runCertsCmd(context.Background(), config.GetConfig(), webhookConfig.GetConfig())
return webhook.InitCerts(context.Background(), config.GetConfig(), webhookConfig.GetConfig())
},
}

type webhookCerts struct {
// base64 Encoded CA Cert
CaPEM *bytes.Buffer
// base64 Encoded Server Cert
ServerPEM *bytes.Buffer
// base64 Encoded Server Cert Key
PrivateKeyPEM *bytes.Buffer
}

func init() {
webhookCmd.AddCommand(initCertsCmd)
}

func runCertsCmd(ctx context.Context, propellerCfg *config.Config, cfg *webhookConfig.Config) error {
podNamespace, found := os.LookupEnv(webhook.PodNamespaceEnvVar)
if !found {
podNamespace = podDefaultNamespace
}

logger.Infof(ctx, "Issuing certs")
certs, err := createCerts(podNamespace)
if err != nil {
return err
}

kubeClient, _, err := utils.GetKubeConfig(ctx, propellerCfg)
if err != nil {
return err
}

logger.Infof(ctx, "Creating secret [%v] in Namespace [%v]", cfg.SecretName, podNamespace)
err = createWebhookSecret(ctx, podNamespace, cfg, certs, kubeClient.CoreV1().Secrets(podNamespace))
if err != nil {
return err
}

return nil
}

func createWebhookSecret(ctx context.Context, namespace string, cfg *webhookConfig.Config, certs webhookCerts, secretsClient v1.SecretInterface) error {
isImmutable := true
secretData := map[string][]byte{
CaCertKey: certs.CaPEM.Bytes(),
ServerCertKey: certs.ServerPEM.Bytes(),
ServerCertPrivateKey: certs.PrivateKeyPEM.Bytes(),
}

secret := &corev1.Secret{
ObjectMeta: v12.ObjectMeta{
Name: cfg.SecretName,
Namespace: namespace,
},
Type: corev1.SecretTypeOpaque,
Data: secretData,
Immutable: &isImmutable,
}

_, err := secretsClient.Create(ctx, secret, v12.CreateOptions{})
if err == nil {
logger.Infof(ctx, "Created secret [%v]", cfg.SecretName)
return nil
}

if kubeErrors.IsAlreadyExists(err) {
logger.Infof(ctx, "A secret already exists with the same name. Validating.")
s, err := secretsClient.Get(ctx, cfg.SecretName, v12.GetOptions{})
if err != nil {
return err
}

// If ServerCertKey or ServerCertPrivateKey are missing, update
requiresUpdate := false
for key := range secretData {
if key == CaCertKey {
continue
}

if _, exists := s.Data[key]; !exists {
requiresUpdate = true
break
}
}

if requiresUpdate {
logger.Infof(ctx, "The existing secret is missing one or more keys.")
secret.Annotations = map[string]string{
"flyteLastUpdate": "system-updated",
"flyteUpdatedAt": time.Now().String(),
}

_, err = secretsClient.Update(ctx, secret, v12.UpdateOptions{})
if err != nil && kubeErrors.IsConflict(err) {
logger.Infof(ctx, "Another instance of flyteadmin has updated the same secret. Ignoring this update")
err = nil
}

return err
}

return nil
}

return err
}

func createCerts(serviceNamespace string) (certs webhookCerts, err error) {
// CA config
caRequest := &x509.Certificate{
SerialNumber: big.NewInt(2021),
Subject: pkix.Name{
Organization: []string{"flyte.org"},
},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(1, 0, 0),
IsCA: true,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
}

// CA private key
caPrivateKey, err := rsa.GenerateKey(cryptorand.Reader, 4096)
if err != nil {
return webhookCerts{}, err
}

// Self signed CA certificate
caCert, err := x509.CreateCertificate(cryptorand.Reader, caRequest, caRequest, &caPrivateKey.PublicKey, caPrivateKey)
if err != nil {
return webhookCerts{}, err
}

// PEM encode CA cert
caPEM := new(bytes.Buffer)
err = pem.Encode(caPEM, &pem.Block{
Type: "CERTIFICATE",
Bytes: caCert,
})
if err != nil {
return webhookCerts{}, err
}

dnsNames := []string{"flyte-pod-webhook",
"flyte-pod-webhook." + serviceNamespace, "flyte-pod-webhook." + serviceNamespace + ".svc"}
commonName := "flyte-pod-webhook." + serviceNamespace + ".svc"

// server cert config
certRequest := &x509.Certificate{
DNSNames: dnsNames,
SerialNumber: big.NewInt(1658),
Subject: pkix.Name{
CommonName: commonName,
Organization: []string{"flyte.org"},
},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(1, 0, 0),
SubjectKeyId: []byte{1, 2, 3, 4, 6},
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
KeyUsage: x509.KeyUsageDigitalSignature,
}

// server private key
serverPrivateKey, err := rsa.GenerateKey(cryptorand.Reader, 4096)
if err != nil {
return webhookCerts{}, err
}

// sign the server cert
cert, err := x509.CreateCertificate(cryptorand.Reader, certRequest, caRequest, &serverPrivateKey.PublicKey, caPrivateKey)
if err != nil {
return webhookCerts{}, err
}

// PEM encode the server cert and key
serverCertPEM := new(bytes.Buffer)
err = pem.Encode(serverCertPEM, &pem.Block{
Type: "CERTIFICATE",
Bytes: cert,
})

if err != nil {
return webhookCerts{}, fmt.Errorf("failed to Encode CertPEM. Error: %w", err)
}

serverPrivKeyPEM := new(bytes.Buffer)
err = pem.Encode(serverPrivKeyPEM, &pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(serverPrivateKey),
})

if err != nil {
return webhookCerts{}, fmt.Errorf("failed to Encode Cert Private Key. Error: %w", err)
}

return webhookCerts{
CaPEM: caPEM,
ServerPEM: serverCertPEM,
PrivateKeyPEM: serverPrivKeyPEM,
}, nil
}
38 changes: 33 additions & 5 deletions flytepropeller/cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"runtime"

"github.com/flyteorg/flytestdlib/profutils"
"github.com/flyteorg/flytestdlib/promutils"
"golang.org/x/sync/errgroup"
"k8s.io/klog"

config2 "github.com/flyteorg/flytepropeller/pkg/controller/config"
Expand Down Expand Up @@ -103,12 +105,38 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error {
// set up signals so we handle the first shutdown signal gracefully
ctx := signals.SetupSignalHandler(baseCtx)

go func() {
err := profutils.StartProfilingServerWithDefaultHandlers(ctx, cfg.ProfilerPort.Port, nil)
// Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics.
propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(cfg.LimitNamespace)
mgr, err := controller.CreateControllerManager(ctx, cfg, defaultNamespace, &propellerScope)
if err != nil {
logger.Fatalf(ctx, "Failed to create controller manager. Error: %v", err)
return err
}

g, childCtx := errgroup.WithContext(ctx)
g.Go(func() error {
err := profutils.StartProfilingServerWithDefaultHandlers(childCtx, cfg.ProfilerPort.Port, nil)
if err != nil {
logger.Fatalf(childCtx, "Failed to Start profiling and metrics server. Error: %v", err)
}
return err
})

g.Go(func() error {
err := controller.StartControllerManager(childCtx, mgr)
if err != nil {
logger.Fatalf(childCtx, "Failed to start controller manager. Error: %v", err)
}
return err
})

g.Go(func() error {
err := controller.StartController(childCtx, cfg, defaultNamespace, mgr, &propellerScope)
if err != nil {
logger.Fatalf(ctx, "Failed to Start profiling and metrics server. Error: %v", err)
logger.Fatalf(childCtx, "Failed to start controller. Error: %v", err)
}
}()
return err
})

return controller.StartController(ctx, cfg, defaultNamespace)
return g.Wait()
}
44 changes: 35 additions & 9 deletions flytepropeller/cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package cmd
import (
"context"

"github.com/flyteorg/flytepropeller/pkg/controller"
"github.com/flyteorg/flytestdlib/promutils"
"golang.org/x/sync/errgroup"

webhookConfig "github.com/flyteorg/flytepropeller/pkg/webhook/config"
"github.com/flyteorg/flytestdlib/profutils"

Expand All @@ -13,10 +17,6 @@ import (
"github.com/spf13/cobra"
)

const (
podDefaultNamespace = "default"
)

var webhookCmd = &cobra.Command{
Use: "webhook",
Aliases: []string{"webhooks"},
Expand Down Expand Up @@ -82,11 +82,37 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
// set up signals so we handle the first shutdown signal gracefully
ctx := signals.SetupSignalHandler(origContext)

go func() {
err := profutils.StartProfilingServerWithDefaultHandlers(ctx, propellerCfg.ProfilerPort.Port, nil)
propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(propellerCfg.LimitNamespace)
mgr, err := controller.CreateControllerManager(ctx, propellerCfg, defaultNamespace, &propellerScope)
if err != nil {
logger.Fatalf(ctx, "Failed to create controller manager. Error: %v", err)
return err
}

g, childCtx := errgroup.WithContext(ctx)
g.Go(func() error {
err := profutils.StartProfilingServerWithDefaultHandlers(childCtx, propellerCfg.ProfilerPort.Port, nil)
if err != nil {
logger.Panicf(ctx, "Failed to Start profiling and metrics server. Error: %v", err)
logger.Fatalf(childCtx, "Failed to Start profiling and metrics server. Error: %v", err)
}
}()
return webhook.Run(ctx, propellerCfg, cfg, defaultNamespace)
return err
})

g.Go(func() error {
err := controller.StartControllerManager(childCtx, mgr)
if err != nil {
logger.Fatalf(childCtx, "Failed to start controller manager. Error: %v", err)
}
return err
})

g.Go(func() error {
err := webhook.Run(childCtx, propellerCfg, cfg, defaultNamespace, &propellerScope, mgr)
if err != nil {
logger.Fatalf(childCtx, "Failed to start webhook. Error: %v", err)
}
return err
})

return g.Wait()
}
1 change: 1 addition & 0 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/spf13/cobra v1.1.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
golang.org/x/tools v0.1.10 // indirect
google.golang.org/grpc v1.36.0
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ff9b4e8

Please sign in to comment.