Skip to content

Commit

Permalink
Checks k8s-related port availability in PreInitChecks (#846)
Browse files Browse the repository at this point in the history
PreInitChecks is called on bootstrap or when joining another Kubernetes
cluster. Kubernetes and its services open up several ports; if they're
already in use, we cannot progress.

Adding these checks will make these error cases more explainable to the
user, rather than a generic bootstrap / join error.
  • Loading branch information
claudiubelu authored Dec 3, 2024
1 parent 2e8fcb1 commit 046f45f
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 10 deletions.
16 changes: 14 additions & 2 deletions src/k8s/pkg/k8sd/app/hooks_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,13 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT
Annotations: response.Annotations,
}

serviceConfigs := types.K8sServiceConfigs{
ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs,
ExtraNodeKubeProxyArgs: joinConfig.ExtraNodeKubeProxyArgs,
}

// Pre-init checks
if err := snap.PreInitChecks(ctx, cfg); err != nil {
if err := snap.PreInitChecks(ctx, cfg, serviceConfigs, false); err != nil {
return fmt.Errorf("pre-init checks failed for worker node: %w", err)
}

Expand Down Expand Up @@ -419,8 +424,15 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst
cfg.Certificates.K8sdPublicKey = utils.Pointer(certificates.K8sdPublicKey)
cfg.Certificates.K8sdPrivateKey = utils.Pointer(certificates.K8sdPrivateKey)

serviceConfigs := types.K8sServiceConfigs{
ExtraNodeKubeSchedulerArgs: bootstrapConfig.ExtraNodeKubeSchedulerArgs,
ExtraNodeKubeControllerManagerArgs: bootstrapConfig.ExtraNodeKubeControllerManagerArgs,
ExtraNodeKubeletArgs: bootstrapConfig.ExtraNodeKubeletArgs,
ExtraNodeKubeProxyArgs: bootstrapConfig.ExtraNodeKubeProxyArgs,
}

// Pre-init checks
if err := snap.PreInitChecks(ctx, cfg); err != nil {
if err := snap.PreInitChecks(ctx, cfg, serviceConfigs, true); err != nil {
return fmt.Errorf("pre-init checks failed for bootstrap node: %w", err)
}

Expand Down
10 changes: 9 additions & 1 deletion src/k8s/pkg/k8sd/app/hooks_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util"
"github.com/canonical/k8s/pkg/k8sd/pki"
"github.com/canonical/k8s/pkg/k8sd/setup"
"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/log"
"github.com/canonical/k8s/pkg/utils"
"github.com/canonical/k8s/pkg/utils/control"
Expand Down Expand Up @@ -137,8 +138,15 @@ func (a *App) onPostJoin(ctx context.Context, s state.State, initConfig map[stri
return fmt.Errorf("failed to initialize control plane certificates: %w", err)
}

serviceConfigs := types.K8sServiceConfigs{
ExtraNodeKubeSchedulerArgs: joinConfig.ExtraNodeKubeSchedulerArgs,
ExtraNodeKubeControllerManagerArgs: joinConfig.ExtraNodeKubeControllerManagerArgs,
ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs,
ExtraNodeKubeProxyArgs: joinConfig.ExtraNodeKubeProxyArgs,
}

// Pre-init checks
if err := snap.PreInitChecks(ctx, cfg); err != nil {
if err := snap.PreInitChecks(ctx, cfg, serviceConfigs, true); err != nil {
return fmt.Errorf("pre-init checks failed for joining node: %w", err)
}

Expand Down
71 changes: 71 additions & 0 deletions src/k8s/pkg/k8sd/types/k8s_service_configs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package types

import (
"net"
)

const (
// Default values for Kubernetes services.
KubeControllerManagerPort = "10257"
KubeSchedulerPort = "10259"
KubeletPort = "10250"
KubeletHealthzPort = "10248"
KubeletReadOnlyPort = "10255"
KubeProxyHealthzPort = "10256"
KubeProxyMetricsPort = "10249"
)

type K8sServiceConfigs struct {
ExtraNodeKubeControllerManagerArgs map[string]*string
ExtraNodeKubeSchedulerArgs map[string]*string
ExtraNodeKubeletArgs map[string]*string
ExtraNodeKubeProxyArgs map[string]*string
}

func (s *K8sServiceConfigs) GetKubeControllerManagerPort() string {
return getConfigOrDefault(s.ExtraNodeKubeControllerManagerArgs, "--secure-port", KubeControllerManagerPort)
}

func (s *K8sServiceConfigs) GetKubeSchedulerPort() string {
return getConfigOrDefault(s.ExtraNodeKubeSchedulerArgs, "--secure-port", KubeSchedulerPort)
}

func (s *K8sServiceConfigs) GetKubeletPort() string {
return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--port", KubeletPort)
}

func (s *K8sServiceConfigs) GetKubeletHealthzPort() string {
return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--healthz-port", KubeletHealthzPort)
}

func (s *K8sServiceConfigs) GetKubeletReadOnlyPort() string {
return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--read-only-port", KubeletReadOnlyPort)
}

func (s *K8sServiceConfigs) GetKubeProxyHealthzPort() (string, error) {
address := getConfigOrDefault(s.ExtraNodeKubeProxyArgs, "--healthz-bind-address", "")
if address == "" {
return KubeProxyHealthzPort, nil
}
_, port, err := net.SplitHostPort(address)
return port, err
}

func (s *K8sServiceConfigs) GetKubeProxyMetricsPort() (string, error) {
address := getConfigOrDefault(s.ExtraNodeKubeProxyArgs, "--metrics-bind-address", "")
if address == "" {
return KubeProxyMetricsPort, nil
}
_, port, err := net.SplitHostPort(address)
return port, err
}

func getConfigOrDefault(serviceArgs map[string]*string, optionName, defaultValue string) string {
if serviceArgs == nil {
return defaultValue
} else if val, ok := serviceArgs[optionName]; !ok || val == nil {
return defaultValue
} else {
return *val
}
}
2 changes: 1 addition & 1 deletion src/k8s/pkg/snap/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@ type Snap interface {

K8sdClient(address string) (k8sd.Client, error) // k8sd client

PreInitChecks(ctx context.Context, config types.ClusterConfig) error // pre-init checks before k8s-snap can start
PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error // pre-init checks before k8s-snap can start
}
2 changes: 1 addition & 1 deletion src/k8s/pkg/snap/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (s *Snap) SnapctlSet(ctx context.Context, args ...string) error {
return s.SnapctlSetErr
}

func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig) error {
func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error {
s.PreInitChecksCalledWith = append(s.PreInitChecksCalledWith, config)
return s.PreInitChecksErr
}
Expand Down
7 changes: 5 additions & 2 deletions src/k8s/pkg/snap/snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/log"
"github.com/canonical/k8s/pkg/utils"
"github.com/canonical/k8s/pkg/utils/checks"
"github.com/moby/sys/mountinfo"
"gopkg.in/yaml.v2"
"k8s.io/cli-runtime/pkg/genericclioptions"
Expand Down Expand Up @@ -328,8 +329,10 @@ func (s *snap) SnapctlSet(ctx context.Context, args ...string) error {
return s.runCommand(ctx, append([]string{"snapctl", "set"}, args...))
}

func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig) error {
// TODO: check for available ports for k8s-dqlite, apiserver, containerd, etc
func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error {
if err := checks.CheckK8sServicePorts(config, serviceConfigs, isControlPlane); err != nil {
return fmt.Errorf("Encountered error(s) while verifying port availability for Kubernetes services: %w", err)
}

// NOTE(neoaggelos): in some environments the Kubernetes might hang when running for the first time
// This works around the issue by running them once during the install hook
Expand Down
23 changes: 20 additions & 3 deletions src/k8s/pkg/snap/snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package snap_test
import (
"context"
"fmt"
"net"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -141,15 +142,31 @@ func TestSnap(t *testing.T) {
ContainerdBaseDir: containerdDir,
})
conf := types.ClusterConfig{}
serviceConfigs := types.K8sServiceConfigs{}

err = snap.PreInitChecks(context.Background(), conf)
err = snap.PreInitChecks(context.Background(), conf, serviceConfigs, true)
g.Expect(err).To(Not(HaveOccurred()))
expectedCalls := []string{}
for _, binary := range []string{"kube-apiserver", "kube-controller-manager", "kube-scheduler", "kube-proxy", "kubelet"} {
expectedCalls = append(expectedCalls, fmt.Sprintf("testdir/bin/%s --version", binary))
}
g.Expect(mockRunner.CalledWithCommand).To(ConsistOf(expectedCalls))

t.Run("Fail port already in use", func(t *testing.T) {
g := NewWithT(t)
// Open a port which will be checked (kubelet).
port := "9999"
serviceConfigs := types.K8sServiceConfigs{
ExtraNodeKubeletArgs: map[string]*string{"--port": &port},
}
l, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
g.Expect(err).To(Not(HaveOccurred()))
defer l.Close()

err = snap.PreInitChecks(context.Background(), conf, serviceConfigs, true)
g.Expect(err).To(HaveOccurred())
})

t.Run("Fail socket exists", func(t *testing.T) {
g := NewWithT(t)
// Create the containerd.sock file, which should cause the check to fail.
Expand All @@ -160,15 +177,15 @@ func TestSnap(t *testing.T) {
f.Close()
defer os.Remove(f.Name())

err = snap.PreInitChecks(context.Background(), conf)
err = snap.PreInitChecks(context.Background(), conf, serviceConfigs, true)
g.Expect(err).To(HaveOccurred())
})

t.Run("Fail run command", func(t *testing.T) {
g := NewWithT(t)
mockRunner.Err = fmt.Errorf("some error")

err := snap.PreInitChecks(context.Background(), conf)
err := snap.PreInitChecks(context.Background(), conf, serviceConfigs, true)
g.Expect(err).To(HaveOccurred())
})
})
Expand Down
59 changes: 59 additions & 0 deletions src/k8s/pkg/utils/checks/checks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package checks

import (
"errors"
"fmt"
"strconv"

"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/utils"
)

// CheckK8sServicePorts verifies that the Kubernetes-related ports are free to be used.
// The ports checked depends on whether a node is a control plane node, or a worker node.
func CheckK8sServicePorts(config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error {
var allErrors []error
ports := map[string]string{
// Default values from official Kubernetes documentation.
"kubelet": serviceConfigs.GetKubeletPort(),
"kubelet-healthz": serviceConfigs.GetKubeletHealthzPort(),
"kubelet-read-only": serviceConfigs.GetKubeletReadOnlyPort(),
"k8s-dqlite": strconv.Itoa(config.Datastore.GetK8sDqlitePort()),
"loadbalancer": strconv.Itoa(config.LoadBalancer.GetBGPPeerPort()),
}

if port, err := serviceConfigs.GetKubeProxyHealthzPort(); err != nil {
allErrors = append(allErrors, err)
} else {
ports["kube-proxy-healhz"] = port
}

if port, err := serviceConfigs.GetKubeProxyMetricsPort(); err != nil {
allErrors = append(allErrors, err)
} else {
ports["kube-proxy-metrics"] = port
}

if isControlPlane {
ports["kube-apiserver"] = strconv.Itoa(config.APIServer.GetSecurePort())
ports["kube-scheduler"] = serviceConfigs.GetKubeSchedulerPort()
ports["kube-controller-manager"] = serviceConfigs.GetKubeControllerManagerPort()
} else {
ports["kube-apiserver-proxy"] = strconv.Itoa(config.APIServer.GetSecurePort())
}

for service, port := range ports {
if port == "0" {
// Some ports may be set to 0 in order to disable them. No need to check.
continue
}
if open, err := utils.IsLocalPortOpen(port); err != nil {
// Could not open port due to error.
allErrors = append(allErrors, fmt.Errorf("could not check port %s (needed by: %s): %w", port, service, err))
} else if !open {
allErrors = append(allErrors, fmt.Errorf("port %s (needed by: %s) is already in use.", port, service))
}
}

return errors.Join(allErrors...)
}
21 changes: 21 additions & 0 deletions src/k8s/pkg/utils/net.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package utils

import (
"errors"
"fmt"
"net"
"syscall"
)

// IsLocalPortOpen checks if the given local port is already open or not.
func IsLocalPortOpen(port string) (bool, error) {
// Without an address, Listen will listen on all addresses.
if l, err := net.Listen("tcp", fmt.Sprintf(":%s", port)); errors.Is(err, syscall.EADDRINUSE) {
return false, nil
} else if err != nil {
return false, err
} else {
l.Close()
return true, nil
}
}

0 comments on commit 046f45f

Please sign in to comment.