Skip to content

Commit

Permalink
PreInitChecks: check non-default Kubernetes service ports
Browse files Browse the repository at this point in the history
Previously, we've only checked if the default Kubernetes service prots
are open. However, they can be changed, or disabled, which we did not
take into account. With this, we will.
  • Loading branch information
claudiubelu committed Nov 27, 2024
1 parent c391136 commit a423fc8
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 35 deletions.
18 changes: 16 additions & 2 deletions src/k8s/pkg/k8sd/app/hooks_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,14 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT
Annotations: response.Annotations,
}

serviceConfigs := types.K8sServiceConfigs{
IsControlPlane: false,
ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs,
ExtraNodeKubeProxyArgs: joinConfig.ExtraNodeKubeProxyArgs,
}

// Pre-init checks
if err := snap.PreInitChecks(ctx, cfg, false); err != nil {
if err := snap.PreInitChecks(ctx, cfg, serviceConfigs); err != nil {
return fmt.Errorf("pre-init checks failed for worker node: %w", err)
}

Expand Down Expand Up @@ -419,8 +425,16 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst
cfg.Certificates.K8sdPublicKey = utils.Pointer(certificates.K8sdPublicKey)
cfg.Certificates.K8sdPrivateKey = utils.Pointer(certificates.K8sdPrivateKey)

serviceConfigs := types.K8sServiceConfigs{
IsControlPlane: true,
ExtraNodeKubeSchedulerArgs: bootstrapConfig.ExtraNodeKubeSchedulerArgs,
ExtraNodeKubeControllerManagerArgs: bootstrapConfig.ExtraNodeKubeControllerManagerArgs,
ExtraNodeKubeletArgs: bootstrapConfig.ExtraNodeKubeletArgs,
ExtraNodeKubeProxyArgs: bootstrapConfig.ExtraNodeKubeProxyArgs,
}

// Pre-init checks
if err := snap.PreInitChecks(ctx, cfg, true); err != nil {
if err := snap.PreInitChecks(ctx, cfg, serviceConfigs); err != nil {
return fmt.Errorf("pre-init checks failed for bootstrap node: %w", err)
}

Expand Down
11 changes: 10 additions & 1 deletion src/k8s/pkg/k8sd/app/hooks_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util"
"github.com/canonical/k8s/pkg/k8sd/pki"
"github.com/canonical/k8s/pkg/k8sd/setup"
"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/log"
"github.com/canonical/k8s/pkg/utils"
"github.com/canonical/k8s/pkg/utils/control"
Expand Down Expand Up @@ -137,8 +138,16 @@ func (a *App) onPostJoin(ctx context.Context, s state.State, initConfig map[stri
return fmt.Errorf("failed to initialize control plane certificates: %w", err)
}

serviceConfigs := types.K8sServiceConfigs{
IsControlPlane: true,
ExtraNodeKubeSchedulerArgs: joinConfig.ExtraNodeKubeSchedulerArgs,
ExtraNodeKubeControllerManagerArgs: joinConfig.ExtraNodeKubeControllerManagerArgs,
ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs,
ExtraNodeKubeProxyArgs: joinConfig.ExtraNodeKubeProxyArgs,
}

// Pre-init checks
if err := snap.PreInitChecks(ctx, cfg, true); err != nil {
if err := snap.PreInitChecks(ctx, cfg, serviceConfigs); err != nil {
return fmt.Errorf("pre-init checks failed for joining node: %w", err)
}

Expand Down
72 changes: 72 additions & 0 deletions src/k8s/pkg/k8sd/types/k8s_service_configs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package types

import (
"net"
)

const (
// Default values for Kubernetes services.
KubeControllerManagerPort = "10257"
KubeSchedulerPort = "10259"
KubeletPort = "10250"
KubeletHealthzPort = "10248"
KubeletReadOnlyPort = "10255"
KubeProxyHealthzPort = "10256"
KubeProxyMetricsPort = "10249"
)

type K8sServiceConfigs struct {
IsControlPlane bool
ExtraNodeKubeControllerManagerArgs map[string]*string
ExtraNodeKubeSchedulerArgs map[string]*string
ExtraNodeKubeletArgs map[string]*string
ExtraNodeKubeProxyArgs map[string]*string
}

func (s *K8sServiceConfigs) GetKubeControllerManagerPort() string {
return getConfigOrDefault(s.ExtraNodeKubeControllerManagerArgs, "--secure-port", KubeControllerManagerPort)
}

func (s *K8sServiceConfigs) GetKubeSchedulerPort() string {
return getConfigOrDefault(s.ExtraNodeKubeSchedulerArgs, "--secure-port", KubeSchedulerPort)
}

func (s *K8sServiceConfigs) GetKubeletPort() string {
return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--port", KubeletPort)
}

func (s *K8sServiceConfigs) GetKubeletHealthzPort() string {
return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--healthz-port", KubeletHealthzPort)
}

func (s *K8sServiceConfigs) GetKubeletReadOnlyPort() string {
return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--read-only-port", KubeletReadOnlyPort)
}

func (s *K8sServiceConfigs) GetKubeProxyHealthzPort() (string, error) {
address := getConfigOrDefault(s.ExtraNodeKubeProxyArgs, "--healthz-bind-address", "")
if address == "" {
return KubeProxyHealthzPort, nil
}
_, port, err := net.SplitHostPort(address)
return port, err
}

func (s *K8sServiceConfigs) GetKubeProxyMetricsPort() (string, error) {
address := getConfigOrDefault(s.ExtraNodeKubeProxyArgs, "--metrics-bind-address", "")
if address == "" {
return KubeProxyMetricsPort, nil
}
_, port, err := net.SplitHostPort(address)
return port, err
}

func getConfigOrDefault(serviceArgs map[string]*string, optionName, defaultValue string) string {
if serviceArgs == nil {
return defaultValue
} else if val, ok := serviceArgs[optionName]; !ok || val == nil {
return defaultValue
} else {
return *val
}
}
2 changes: 1 addition & 1 deletion src/k8s/pkg/snap/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@ type Snap interface {

K8sdClient(address string) (k8sd.Client, error) // k8sd client

PreInitChecks(ctx context.Context, config types.ClusterConfig, isControlPlane bool) error // pre-init checks before k8s-snap can start
PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error // pre-init checks before k8s-snap can start
}
2 changes: 1 addition & 1 deletion src/k8s/pkg/snap/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (s *Snap) SnapctlSet(ctx context.Context, args ...string) error {
return s.SnapctlSetErr
}

func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, isControlPlane bool) error {
func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error {
s.PreInitChecksCalledWith = append(s.PreInitChecksCalledWith, config)
return s.PreInitChecksErr
}
Expand Down
55 changes: 34 additions & 21 deletions src/k8s/pkg/snap/snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"

"github.com/canonical/k8s/pkg/client/dqlite"
Expand Down Expand Up @@ -328,11 +329,13 @@ func (s *snap) SnapctlSet(ctx context.Context, args ...string) error {
return s.runCommand(ctx, append([]string{"snapctl", "set"}, args...))
}

func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, isControlPlane bool) error {
if err := checkK8sServicePorts(config, isControlPlane); err != nil {
func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error {
if err := checkK8sServicePorts(config, serviceConfigs); err != nil {
return fmt.Errorf("Encountered error(s) while verifying port availability for Kubernetes services: %w", err)
}

// NOTE(neoaggelos): in some environments the Kubernetes might hang when running for the first time
// This works around the issue by running them once during the install hook
for _, binary := range []string{"kube-apiserver", "kube-controller-manager", "kube-scheduler", "kube-proxy", "kubelet"} {
if err := s.runCommand(ctx, []string{filepath.Join(s.snapDir, "bin", binary), "--version"}); err != nil {
return fmt.Errorf("%q binary could not run: %w", binary, err)
Expand All @@ -354,37 +357,47 @@ func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, is
return nil
}

func checkK8sServicePorts(config types.ClusterConfig, isControlPlane bool) error {
// NOTE(neoaggelos): in some environments the Kubernetes might hang when running for the first time
// This works around the issue by running them once during the install hook
ports := map[string]int{
func checkK8sServicePorts(config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error {
var allErrors []error
ports := map[string]string{
// Default values from official Kubernetes documentation.
"kubelet": 10250,
"kubelet-healthz": 10248,
"kube-proxy-healhz": 10256,
"kube-proxy-metrics": 10249,
"k8s-dqlite": config.Datastore.GetK8sDqlitePort(),
"loadbalancer": config.LoadBalancer.GetBGPPeerPort(),
"kubelet": serviceConfigs.GetKubeletPort(),
"kubelet-healthz": serviceConfigs.GetKubeletHealthzPort(),
"kubelet-read-only": serviceConfigs.GetKubeletReadOnlyPort(),
"k8s-dqlite": strconv.Itoa(config.Datastore.GetK8sDqlitePort()),
"loadbalancer": strconv.Itoa(config.LoadBalancer.GetBGPPeerPort()),
}

if isControlPlane {
ports["kube-apiserver"] = config.APIServer.GetSecurePort()
ports["kube-scheduler"] = 10259
ports["kube-controller-manager"] = 10257
if port, err := serviceConfigs.GetKubeProxyHealthzPort(); err != nil {
allErrors = append(allErrors, err)
} else {
ports["kube-apiserver-proxy"] = config.APIServer.GetSecurePort()
ports["kube-proxy-healhz"] = port
}

if port, err := serviceConfigs.GetKubeProxyMetricsPort(); err != nil {
allErrors = append(allErrors, err)
} else {
ports["kube-proxy-metrics"] = port
}

if serviceConfigs.IsControlPlane {
ports["kube-apiserver"] = strconv.Itoa(config.APIServer.GetSecurePort())
ports["kube-scheduler"] = serviceConfigs.GetKubeSchedulerPort()
ports["kube-controller-manager"] = serviceConfigs.GetKubeControllerManagerPort()
} else {
ports["kube-apiserver-proxy"] = strconv.Itoa(config.APIServer.GetSecurePort())
}

var allErrors []error
for service, port := range ports {
if port == 0 {
if port == "0" {
// Some ports may be set to 0 in order to disable them. No need to check.
continue
}
if open, err := utils.IsLocalPortOpen(port); err != nil {
// Could not open port due to error.
allErrors = append(allErrors, fmt.Errorf("could not check port %d (needed by: %s): %w", port, service, err))
allErrors = append(allErrors, fmt.Errorf("could not check port %s (needed by: %s): %w", port, service, err))
} else if open {
allErrors = append(allErrors, fmt.Errorf("port %d (needed by: %s) is already in use.", port, service))
allErrors = append(allErrors, fmt.Errorf("port %s (needed by: %s) is already in use.", port, service))
}
}

Expand Down
15 changes: 10 additions & 5 deletions src/k8s/pkg/snap/snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@ func TestSnap(t *testing.T) {
ContainerdBaseDir: containerdDir,
})
conf := types.ClusterConfig{}
serviceConfigs := types.K8sServiceConfigs{}

err = snap.PreInitChecks(context.Background(), conf, true)
err = snap.PreInitChecks(context.Background(), conf, serviceConfigs)
g.Expect(err).To(Not(HaveOccurred()))
expectedCalls := []string{}
for _, binary := range []string{"kube-apiserver", "kube-controller-manager", "kube-scheduler", "kube-proxy", "kubelet"} {
Expand All @@ -154,11 +155,15 @@ func TestSnap(t *testing.T) {
t.Run("Fail port already in use", func(t *testing.T) {
g := NewWithT(t)
// Open a port which will be checked (kubelet).
l, err := net.Listen("tcp", ":10250")
port := "9999"
serviceConfigs := types.K8sServiceConfigs{
ExtraNodeKubeletArgs: map[string]*string{"--port": &port},
}
l, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
g.Expect(err).To(Not(HaveOccurred()))
defer l.Close()

err = snap.PreInitChecks(context.Background(), conf, true)
err = snap.PreInitChecks(context.Background(), conf, serviceConfigs)
g.Expect(err).To(HaveOccurred())
})

Expand All @@ -172,15 +177,15 @@ func TestSnap(t *testing.T) {
f.Close()
defer os.Remove(f.Name())

err = snap.PreInitChecks(context.Background(), conf, true)
err = snap.PreInitChecks(context.Background(), conf, serviceConfigs)
g.Expect(err).To(HaveOccurred())
})

t.Run("Fail run command", func(t *testing.T) {
g := NewWithT(t)
mockRunner.Err = fmt.Errorf("some error")

err := snap.PreInitChecks(context.Background(), conf, true)
err := snap.PreInitChecks(context.Background(), conf, serviceConfigs)
g.Expect(err).To(HaveOccurred())
})
})
Expand Down
7 changes: 3 additions & 4 deletions src/k8s/pkg/utils/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ import (
"errors"
"net"
"os"
"strconv"
"syscall"
"time"
)

// IsLocalPortOpen checks if the given local port is already open or not.
func IsLocalPortOpen(port int) (bool, error) {
func IsLocalPortOpen(port string) (bool, error) {
if err := checkPort("localhost", port, 500*time.Millisecond); err == nil {
return true, nil
} else if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, syscall.ECONNREFUSED) {
Expand All @@ -21,8 +20,8 @@ func IsLocalPortOpen(port int) (bool, error) {
}
}

func checkPort(host string, port int, timeout time.Duration) error {
conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, strconv.Itoa(port)), timeout)
func checkPort(host, port string, timeout time.Duration) error {
conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, port), timeout)
if err != nil {
return err
}
Expand Down

0 comments on commit a423fc8

Please sign in to comment.