Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Worker Certificates Renewal Endpoints #565

Merged
merged 21 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/k8s/api/v1/certificates_refresh.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package apiv1

type RefreshCertificatesPlanResponse struct {
Seed int `json:"seed"`
CertificatesSigningRequests []string `json:"certificates-signing-requests"`
}

type RefreshCertificatesRunRequest struct {
Seed int `json:"seed"`
ExpirationSeconds int `json:"expiration-seconds"`
}

type RefreshCertificatesRunResponse struct {
ExpirationSeconds int `json:"expiration-seconds"`
}
2 changes: 1 addition & 1 deletion src/k8s/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/pelletier/go-toml v1.9.5
github.com/spf13/cobra v1.8.1
golang.org/x/net v0.27.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.22.0
gopkg.in/yaml.v2 v2.4.0
helm.sh/helm/v3 v3.15.3
Expand Down Expand Up @@ -150,7 +151,6 @@ require (
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
Expand Down
79 changes: 79 additions & 0 deletions src/k8s/pkg/client/kubernetes/certificate_signing_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package kubernetes

import (
"context"
"fmt"
"time"

"github.com/canonical/k8s/pkg/log"
certificatesv1 "k8s.io/api/certificates/v1"
mateoflorido marked this conversation as resolved.
Show resolved Hide resolved
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

// WatchCertificateSigningRequest watches a CertificateSigningRequest with the
// given name and calls a verify function on each event.
// WatchCertificateSigningRequest will continue watching the CSR until the
// verify function returns true or an non-retriable error occurs.
//
// The verify function should return true if the CSR is valid and processing
// should stop.
// The verify function should return false if the CSR is not yet valid and
// processing should continue.
// The verify function should return an error if the CSR is in an invalid state
// (e.g., failed or denied) or the issued certificate is invalid.
func (c *Client) WatchCertificateSigningRequest(ctx context.Context, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) error {
log := log.FromContext(ctx)
for {
w, err := c.CertificatesV1().CertificateSigningRequests().Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: name}))
if err != nil {
log.V(1).Info("Failed to watch CSR", "error", err)
mateoflorido marked this conversation as resolved.
Show resolved Hide resolved
continue
}

if retry, err := c.watchCertificateSigningRequestEvents(ctx, w, name, verify); err != nil {
return fmt.Errorf("failed to watch CSR %s: %w", name, err)
} else if !retry {
return nil
}

w.Stop()
log.V(1).Info("Retrying to watch CSR")
mateoflorido marked this conversation as resolved.
Show resolved Hide resolved

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(3 * time.Second):
}
}
}

func (c *Client) watchCertificateSigningRequestEvents(ctx context.Context, w watch.Interface, name string, verify func(csr *certificatesv1.CertificateSigningRequest) (bool, error)) (bool, error) {
log := log.FromContext(ctx)
mateoflorido marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-ctx.Done():
return false, ctx.Err()
case evt, ok := <-w.ResultChan():
if !ok {
log.V(1).Info("Watch closed")
// Retry
return true, nil
}

csr, ok := evt.Object.(*certificatesv1.CertificateSigningRequest)
if !ok {
log.V(1).Info("Expected a CertificateSigningRequest but received something else", "object", evt.Object)
// Retry
return true, nil
}

if valid, err := verify(csr); err != nil {
// Stop watching and return the error
return false, fmt.Errorf("failed to verify CSR %s: %w", name, err)
} else if valid {
return false, nil
}
}
}
}
257 changes: 257 additions & 0 deletions src/k8s/pkg/k8sd/api/certificates_refresh.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
package api

import (
"crypto/x509/pkix"
"fmt"
"math"
"math/rand"
"net"
"net/http"
"path/filepath"

apiv1 "github.com/canonical/k8s/api/v1"
databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util"
"github.com/canonical/k8s/pkg/k8sd/pki"
"github.com/canonical/k8s/pkg/k8sd/setup"
"github.com/canonical/k8s/pkg/log"
"github.com/canonical/k8s/pkg/snap"
snaputil "github.com/canonical/k8s/pkg/snap/util"
"github.com/canonical/k8s/pkg/utils"
pkiutil "github.com/canonical/k8s/pkg/utils/pki"
"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/v2/state"
"golang.org/x/sync/errgroup"
certificatesv1 "k8s.io/api/certificates/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (e *Endpoints) postRefreshCertsPlan(s state.State, r *http.Request) response.Response {
seed := rand.Intn(math.MaxInt)

snap := e.provider.Snap()
isWorker, err := snaputil.IsWorker(snap)
if err != nil {
return response.InternalError(fmt.Errorf("failed to check if node is a worker: %w", err))
}
if isWorker {
return response.SyncResponse(true, apiv1.RefreshCertificatesPlanResponse{
Seed: seed,
CertificatesSigningRequests: []string{
fmt.Sprintf("k8sd-%d-worker-kubelet-serving", seed),
fmt.Sprintf("k8sd-%d-worker-kubelet-client", seed),
fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", seed),
},
})
}

return response.SyncResponse(true, apiv1.RefreshCertificatesPlanResponse{
Seed: seed,
})

}

func (e *Endpoints) postRefreshCertsRun(s state.State, r *http.Request) response.Response {
snap := e.provider.Snap()
isWorker, err := snaputil.IsWorker(snap)
if err != nil {
return response.InternalError(fmt.Errorf("failed to check if node is a worker: %w", err))
}
if isWorker {
return refreshCertsRunWorker(s, r, snap)
}
// TODO: Control Plane refresh
return response.InternalError(fmt.Errorf("not implemented yet"))
}

// refreshCertsRunWorker refreshes the certificates for a worker node
func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) response.Response {
log := log.FromContext(r.Context())

req := apiv1.RefreshCertificatesRunRequest{}
if err := utils.NewStrictJSONDecoder(r.Body).Decode(&req); err != nil {
return response.BadRequest(fmt.Errorf("failed to parse request: %w", err))
}

client, err := snap.KubernetesNodeClient("")
if err != nil {
return response.InternalError(fmt.Errorf("failed to get Kubernetes client: %w", err))
}

var certificates pki.WorkerNodePKI

clusterConfig, err := databaseutil.GetClusterConfig(r.Context(), s)
if err != nil {
return response.InternalError(fmt.Errorf("failed to get cluster configuration: %w", err))
}

if clusterConfig.Certificates.CACert == nil || clusterConfig.Certificates.ClientCACert == nil {
return response.InternalError(fmt.Errorf("missing CA certificates"))
}

certificates.CACert = clusterConfig.Certificates.GetCACert()
certificates.ClientCACert = clusterConfig.Certificates.GetClientCACert()

g, ctx := errgroup.WithContext(r.Context())

for _, csr := range []struct {
name string
commonName string
organization []string
usages []certificatesv1.KeyUsage
hostnames []string
ips []net.IP
signerName string
certificate *string
key *string
}{
{
name: fmt.Sprintf("k8sd-%d-worker-kubelet-serving", req.Seed),
commonName: fmt.Sprintf("system:node:%s", snap.Hostname()),
organization: []string{"system:nodes"},
usages: []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageKeyEncipherment, certificatesv1.UsageServerAuth},
hostnames: []string{snap.Hostname()},
ips: []net.IP{net.ParseIP(s.Address().Hostname())},
signerName: "k8sd.io/kubelet-serving",
certificate: &certificates.KubeletCert,
key: &certificates.KubeletKey,
},
{
name: fmt.Sprintf("k8sd-%d-worker-kubelet-client", req.Seed),
commonName: fmt.Sprintf("system:node:%s", snap.Hostname()),
organization: []string{"system:nodes"},
usages: []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageKeyEncipherment, certificatesv1.UsageClientAuth},
signerName: "k8sd.io/kubelet-client",
certificate: &certificates.KubeletClientCert,
key: &certificates.KubeletClientKey,
},
{
name: fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed),
commonName: "system:kube-proxy",
usages: []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageKeyEncipherment, certificatesv1.UsageClientAuth},
signerName: "k8sd.io/kube-proxy-client",
certificate: &certificates.KubeProxyClientCert,
key: &certificates.KubeProxyClientKey,
},
} {
csr := csr
g.Go(func() error {
csrPEM, keyPEM, err := pkiutil.GenerateCSR(
pkix.Name{
CommonName: csr.commonName,
Organization: csr.organization,
},
2048,
csr.hostnames,
csr.ips,
)
if err != nil {
return fmt.Errorf("failed to generate CSR for %s: %w", csr.name, err)
}

if _, err = client.CertificatesV1().CertificateSigningRequests().Create(ctx, &certificatesv1.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: csr.name,
},
Spec: certificatesv1.CertificateSigningRequestSpec{
Request: []byte(csrPEM),
Usages: csr.usages,
SignerName: csr.signerName,
},
}, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create CSR for %s: %w", csr.name, err)
}

if err := client.WatchCertificateSigningRequest(
ctx,
csr.name,
func(request *certificatesv1.CertificateSigningRequest) (bool, error) {
return verifyCSRAndSetPKI(request, keyPEM, csr.certificate, csr.key)
},
); err != nil {
log.Error(err, "Failed to watch CSR")
mateoflorido marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("certificate signing request failed: %w", err)
}

return nil

})

}

if err := g.Wait(); err != nil {
return response.InternalError(fmt.Errorf("failed to get worker node certificates: %w", err))
}

if _, err = setup.EnsureWorkerPKI(snap, &certificates); err != nil {
return response.InternalError(fmt.Errorf("failed to write worker PKI: %w", err))
}

// Kubeconfigs
if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf"), "127.0.0.1:6443", certificates.CACert, certificates.KubeletClientCert, certificates.KubeletClientKey); err != nil {
return response.InternalError(fmt.Errorf("failed to generate kubelet kubeconfig: %w", err))
}
if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "proxy.conf"), "127.0.0.1:6443", certificates.CACert, certificates.KubeProxyClientCert, certificates.KubeProxyClientKey); err != nil {
return response.InternalError(fmt.Errorf("failed to generate kube-proxy kubeconfig: %w", err))
}

// Restart the services
if err := snap.RestartService(r.Context(), "kubelet"); err != nil {
return response.InternalError(fmt.Errorf("failed to restart kubelet: %w", err))
}
if err := snap.RestartService(r.Context(), "kube-proxy"); err != nil {
return response.InternalError(fmt.Errorf("failed to restart kube-proxy: %w", err))
}

cert, _, err := pkiutil.LoadCertificate(certificates.KubeletCert, "")
if err != nil {
return response.InternalError(fmt.Errorf("failed to load kubelet certificate: %w", err))
}

expirationDuration := cert.NotAfter.Sub(cert.NotBefore)
return response.SyncResponse(true, apiv1.RefreshCertificatesRunResponse{
ExpirationSeconds: int(expirationDuration.Seconds()),
})

}

// isCertificateSigningRequestApprovedAndIssued checks if the certificate
// signing request is approved and issued. It returns true if the CSR is
// approved and issued, false if it is pending, and an error if it is denied
// or failed.
func isCertificateSigningRequestApprovedAndIssued(csr *certificatesv1.CertificateSigningRequest) (bool, error) {
for _, condition := range csr.Status.Conditions {
if condition.Type == certificatesv1.CertificateApproved && condition.Status == corev1.ConditionTrue {
return len(csr.Status.Certificate) > 0, nil

}
if condition.Type == certificatesv1.CertificateDenied && condition.Status == corev1.ConditionTrue {
return false, fmt.Errorf(":CSR %s was denied: %s", csr.Name, condition.Reason)
}
if condition.Type == certificatesv1.CertificateFailed && condition.Status == corev1.ConditionTrue {
return false, fmt.Errorf("CSR %s failed: %s", csr.Name, condition.Reason)
}
}
return false, nil
}

// verifyCSRAndSetPKI verifies the certificate signing request and sets the
// certificate and key if the CSR is approved.
func verifyCSRAndSetPKI(csr *certificatesv1.CertificateSigningRequest, keyPEM string, certificate, key *string) (bool, error) {
approved, err := isCertificateSigningRequestApprovedAndIssued(csr)
if err != nil {
return false, fmt.Errorf("failed to validate csr: %w", err)
}

if !approved {
return false, nil
}

if _, _, err = pkiutil.LoadCertificate(string(csr.Status.Certificate), ""); err != nil {
return false, fmt.Errorf("failed to load certificate: %w", err)
}

*certificate = string(csr.Status.Certificate)
*key = keyPEM
return true, nil
}
11 changes: 11 additions & 0 deletions src/k8s/pkg/k8sd/api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ func (e *Endpoints) Endpoints() []rest.Endpoint {
AccessHandler: ValidateWorkerInfoAccessHandler("worker-name", "worker-token"),
},
},
// Certificates
{
Name: "RefreshCerts/Plan",
Path: "k8sd/refresh-certs/plan",
Post: rest.EndpointAction{Handler: e.postRefreshCertsPlan},
},
{
Name: "RefreshCerts/Run",
Path: "k8sd/refresh-certs/run",
Post: rest.EndpointAction{Handler: e.postRefreshCertsRun},
},
// Kubeconfig
{
Name: "Kubeconfig",
Expand Down
Loading
Loading