Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checks k8s-related port availability in PreInitChecks #846

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/k8s/pkg/k8sd/app/hooks_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,13 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT
Annotations: response.Annotations,
}

serviceConfigs := types.K8sServiceConfigs{
ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs,
ExtraNodeKubeProxyArgs: joinConfig.ExtraNodeKubeProxyArgs,
}

// Pre-init checks
if err := snap.PreInitChecks(ctx, cfg); err != nil {
if err := snap.PreInitChecks(ctx, cfg, serviceConfigs, false); err != nil {
return fmt.Errorf("pre-init checks failed for worker node: %w", err)
}

Expand Down Expand Up @@ -419,8 +424,15 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst
cfg.Certificates.K8sdPublicKey = utils.Pointer(certificates.K8sdPublicKey)
cfg.Certificates.K8sdPrivateKey = utils.Pointer(certificates.K8sdPrivateKey)

serviceConfigs := types.K8sServiceConfigs{
ExtraNodeKubeSchedulerArgs: bootstrapConfig.ExtraNodeKubeSchedulerArgs,
ExtraNodeKubeControllerManagerArgs: bootstrapConfig.ExtraNodeKubeControllerManagerArgs,
ExtraNodeKubeletArgs: bootstrapConfig.ExtraNodeKubeletArgs,
ExtraNodeKubeProxyArgs: bootstrapConfig.ExtraNodeKubeProxyArgs,
}

// Pre-init checks
if err := snap.PreInitChecks(ctx, cfg); err != nil {
if err := snap.PreInitChecks(ctx, cfg, serviceConfigs, true); err != nil {
return fmt.Errorf("pre-init checks failed for bootstrap node: %w", err)
}

Expand Down
10 changes: 9 additions & 1 deletion src/k8s/pkg/k8sd/app/hooks_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util"
"github.com/canonical/k8s/pkg/k8sd/pki"
"github.com/canonical/k8s/pkg/k8sd/setup"
"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/log"
"github.com/canonical/k8s/pkg/utils"
"github.com/canonical/k8s/pkg/utils/control"
Expand Down Expand Up @@ -137,8 +138,15 @@ func (a *App) onPostJoin(ctx context.Context, s state.State, initConfig map[stri
return fmt.Errorf("failed to initialize control plane certificates: %w", err)
}

serviceConfigs := types.K8sServiceConfigs{
ExtraNodeKubeSchedulerArgs: joinConfig.ExtraNodeKubeSchedulerArgs,
ExtraNodeKubeControllerManagerArgs: joinConfig.ExtraNodeKubeControllerManagerArgs,
ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs,
ExtraNodeKubeProxyArgs: joinConfig.ExtraNodeKubeProxyArgs,
}

// Pre-init checks
if err := snap.PreInitChecks(ctx, cfg); err != nil {
if err := snap.PreInitChecks(ctx, cfg, serviceConfigs, true); err != nil {
return fmt.Errorf("pre-init checks failed for joining node: %w", err)
}

Expand Down
71 changes: 71 additions & 0 deletions src/k8s/pkg/k8sd/types/k8s_service_configs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package types

import (
"net"
)

const (
// Default values for Kubernetes services.
KubeControllerManagerPort = "10257"
KubeSchedulerPort = "10259"
KubeletPort = "10250"
KubeletHealthzPort = "10248"
KubeletReadOnlyPort = "10255"
KubeProxyHealthzPort = "10256"
KubeProxyMetricsPort = "10249"
)

type K8sServiceConfigs struct {
ExtraNodeKubeControllerManagerArgs map[string]*string
ExtraNodeKubeSchedulerArgs map[string]*string
ExtraNodeKubeletArgs map[string]*string
ExtraNodeKubeProxyArgs map[string]*string
}

func (s *K8sServiceConfigs) GetKubeControllerManagerPort() string {
return getConfigOrDefault(s.ExtraNodeKubeControllerManagerArgs, "--secure-port", KubeControllerManagerPort)
}

func (s *K8sServiceConfigs) GetKubeSchedulerPort() string {
return getConfigOrDefault(s.ExtraNodeKubeSchedulerArgs, "--secure-port", KubeSchedulerPort)
}

func (s *K8sServiceConfigs) GetKubeletPort() string {
return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--port", KubeletPort)
}

func (s *K8sServiceConfigs) GetKubeletHealthzPort() string {
return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--healthz-port", KubeletHealthzPort)
}

func (s *K8sServiceConfigs) GetKubeletReadOnlyPort() string {
return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--read-only-port", KubeletReadOnlyPort)
}

func (s *K8sServiceConfigs) GetKubeProxyHealthzPort() (string, error) {
address := getConfigOrDefault(s.ExtraNodeKubeProxyArgs, "--healthz-bind-address", "")
if address == "" {
return KubeProxyHealthzPort, nil
}
_, port, err := net.SplitHostPort(address)
return port, err
}

func (s *K8sServiceConfigs) GetKubeProxyMetricsPort() (string, error) {
address := getConfigOrDefault(s.ExtraNodeKubeProxyArgs, "--metrics-bind-address", "")
if address == "" {
return KubeProxyMetricsPort, nil
}
_, port, err := net.SplitHostPort(address)
return port, err
}

func getConfigOrDefault(serviceArgs map[string]*string, optionName, defaultValue string) string {
if serviceArgs == nil {
return defaultValue
} else if val, ok := serviceArgs[optionName]; !ok || val == nil {
return defaultValue
} else {
return *val
}
}
2 changes: 1 addition & 1 deletion src/k8s/pkg/snap/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@ type Snap interface {

K8sdClient(address string) (k8sd.Client, error) // k8sd client

PreInitChecks(ctx context.Context, config types.ClusterConfig) error // pre-init checks before k8s-snap can start
PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error // pre-init checks before k8s-snap can start
}
2 changes: 1 addition & 1 deletion src/k8s/pkg/snap/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (s *Snap) SnapctlSet(ctx context.Context, args ...string) error {
return s.SnapctlSetErr
}

func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig) error {
func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error {
s.PreInitChecksCalledWith = append(s.PreInitChecksCalledWith, config)
return s.PreInitChecksErr
}
Expand Down
7 changes: 5 additions & 2 deletions src/k8s/pkg/snap/snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/log"
"github.com/canonical/k8s/pkg/utils"
"github.com/canonical/k8s/pkg/utils/checks"
"github.com/moby/sys/mountinfo"
"gopkg.in/yaml.v2"
"k8s.io/cli-runtime/pkg/genericclioptions"
Expand Down Expand Up @@ -328,8 +329,10 @@ func (s *snap) SnapctlSet(ctx context.Context, args ...string) error {
return s.runCommand(ctx, append([]string{"snapctl", "set"}, args...))
}

func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig) error {
// TODO: check for available ports for k8s-dqlite, apiserver, containerd, etc
func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error {
if err := checks.CheckK8sServicePorts(config, serviceConfigs, isControlPlane); err != nil {
return fmt.Errorf("Encountered error(s) while verifying port availability for Kubernetes services: %w", err)
}

// NOTE(neoaggelos): in some environments the Kubernetes might hang when running for the first time
// This works around the issue by running them once during the install hook
Expand Down
23 changes: 20 additions & 3 deletions src/k8s/pkg/snap/snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package snap_test
import (
"context"
"fmt"
"net"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -141,15 +142,31 @@ func TestSnap(t *testing.T) {
ContainerdBaseDir: containerdDir,
})
conf := types.ClusterConfig{}
serviceConfigs := types.K8sServiceConfigs{}

err = snap.PreInitChecks(context.Background(), conf)
err = snap.PreInitChecks(context.Background(), conf, serviceConfigs, true)
g.Expect(err).To(Not(HaveOccurred()))
expectedCalls := []string{}
for _, binary := range []string{"kube-apiserver", "kube-controller-manager", "kube-scheduler", "kube-proxy", "kubelet"} {
expectedCalls = append(expectedCalls, fmt.Sprintf("testdir/bin/%s --version", binary))
}
g.Expect(mockRunner.CalledWithCommand).To(ConsistOf(expectedCalls))

t.Run("Fail port already in use", func(t *testing.T) {
g := NewWithT(t)
// Open a port which will be checked (kubelet).
port := "9999"
serviceConfigs := types.K8sServiceConfigs{
ExtraNodeKubeletArgs: map[string]*string{"--port": &port},
}
l, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
g.Expect(err).To(Not(HaveOccurred()))
defer l.Close()

err = snap.PreInitChecks(context.Background(), conf, serviceConfigs, true)
g.Expect(err).To(HaveOccurred())
})

t.Run("Fail socket exists", func(t *testing.T) {
g := NewWithT(t)
// Create the containerd.sock file, which should cause the check to fail.
Expand All @@ -160,15 +177,15 @@ func TestSnap(t *testing.T) {
f.Close()
defer os.Remove(f.Name())

err = snap.PreInitChecks(context.Background(), conf)
err = snap.PreInitChecks(context.Background(), conf, serviceConfigs, true)
g.Expect(err).To(HaveOccurred())
})

t.Run("Fail run command", func(t *testing.T) {
g := NewWithT(t)
mockRunner.Err = fmt.Errorf("some error")

err := snap.PreInitChecks(context.Background(), conf)
err := snap.PreInitChecks(context.Background(), conf, serviceConfigs, true)
g.Expect(err).To(HaveOccurred())
})
})
Expand Down
59 changes: 59 additions & 0 deletions src/k8s/pkg/utils/checks/checks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package checks

import (
"errors"
"fmt"
"strconv"

"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/utils"
)

// CheckK8sServicePorts verifies that the Kubernetes-related ports are free to be used.
// The ports checked depends on whether a node is a control plane node, or a worker node.
func CheckK8sServicePorts(config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error {
var allErrors []error
ports := map[string]string{
// Default values from official Kubernetes documentation.
"kubelet": serviceConfigs.GetKubeletPort(),
"kubelet-healthz": serviceConfigs.GetKubeletHealthzPort(),
"kubelet-read-only": serviceConfigs.GetKubeletReadOnlyPort(),
"k8s-dqlite": strconv.Itoa(config.Datastore.GetK8sDqlitePort()),
"loadbalancer": strconv.Itoa(config.LoadBalancer.GetBGPPeerPort()),
}

if port, err := serviceConfigs.GetKubeProxyHealthzPort(); err != nil {
allErrors = append(allErrors, err)
} else {
ports["kube-proxy-healhz"] = port
}

if port, err := serviceConfigs.GetKubeProxyMetricsPort(); err != nil {
allErrors = append(allErrors, err)
} else {
ports["kube-proxy-metrics"] = port
}

if isControlPlane {
ports["kube-apiserver"] = strconv.Itoa(config.APIServer.GetSecurePort())
ports["kube-scheduler"] = serviceConfigs.GetKubeSchedulerPort()
ports["kube-controller-manager"] = serviceConfigs.GetKubeControllerManagerPort()
} else {
ports["kube-apiserver-proxy"] = strconv.Itoa(config.APIServer.GetSecurePort())
}

for service, port := range ports {
if port == "0" {
// Some ports may be set to 0 in order to disable them. No need to check.
continue
}
if open, err := utils.IsLocalPortOpen(port); err != nil {
// Could not open port due to error.
allErrors = append(allErrors, fmt.Errorf("could not check port %s (needed by: %s): %w", port, service, err))
} else if !open {
allErrors = append(allErrors, fmt.Errorf("port %s (needed by: %s) is already in use.", port, service))
}
}

return errors.Join(allErrors...)
}
21 changes: 21 additions & 0 deletions src/k8s/pkg/utils/net.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package utils

import (
"errors"
"fmt"
"net"
"syscall"
)

// IsLocalPortOpen checks if the given local port is already open or not.
func IsLocalPortOpen(port string) (bool, error) {
// Without an address, Listen will listen on all addresses.
if l, err := net.Listen("tcp", fmt.Sprintf(":%s", port)); errors.Is(err, syscall.EADDRINUSE) {
return false, nil
} else if err != nil {
return false, err
} else {
l.Close()
return true, nil
}
}
Loading