From 5b70d88e91b67bef44beb56cca69720a48baa0d8 Mon Sep 17 00:00:00 2001 From: David Przybilla Date: Mon, 6 Jun 2022 17:32:11 +0900 Subject: [PATCH] Start sandbox logic shared for demo/sandbox cmds #minor (#326) --- .../subcommand/sandbox/sandbox_config.go | 42 +- flytectl/cmd/demo/demo.go | 4 +- flytectl/cmd/demo/start.go | 272 +---------- flytectl/cmd/demo/start_test.go | 425 ----------------- flytectl/cmd/sandbox/sandbox.go | 4 +- flytectl/cmd/sandbox/start.go | 250 +--------- flytectl/cmd/sandbox/start_test.go | 427 ----------------- flytectl/pkg/docker/docker.go | 30 ++ flytectl/pkg/docker/docker_util.go | 10 +- flytectl/pkg/docker/docker_util_test.go | 10 +- .../docker}/imagepullpolicy_enumer.go | 2 +- flytectl/pkg/sandbox/start.go | 308 ++++++++++++ flytectl/pkg/sandbox/start_test.go | 438 ++++++++++++++++++ 13 files changed, 806 insertions(+), 1416 deletions(-) rename flytectl/{cmd/config/subcommand/sandbox => pkg/docker}/imagepullpolicy_enumer.go (99%) create mode 100644 flytectl/pkg/sandbox/start.go create mode 100644 flytectl/pkg/sandbox/start_test.go diff --git a/flytectl/cmd/config/subcommand/sandbox/sandbox_config.go b/flytectl/cmd/config/subcommand/sandbox/sandbox_config.go index 7381a398b7..44f76f614c 100644 --- a/flytectl/cmd/config/subcommand/sandbox/sandbox_config.go +++ b/flytectl/cmd/config/subcommand/sandbox/sandbox_config.go @@ -1,34 +1,6 @@ package sandbox -//go:generate enumer -type=ImagePullPolicy -trimprefix=ImagePullPolicy --json -type ImagePullPolicy int - -const ( - ImagePullPolicyAlways ImagePullPolicy = iota - ImagePullPolicyIfNotPresent - ImagePullPolicyNever -) - -// Set implements PFlag's Value interface to attempt to set the value of the flag from string. -func (i *ImagePullPolicy) Set(val string) error { - policy, err := ImagePullPolicyString(val) - if err != nil { - return err - } - - *i = policy - return nil -} - -// Type implements PFlag's Value interface to return type name. -func (i ImagePullPolicy) Type() string { - return "ImagePullPolicy" -} - -//go:generate pflags Config --default-var DefaultConfig --bind-default-var -var ( - DefaultConfig = &Config{} -) +import "github.com/flyteorg/flytectl/pkg/docker" //Config holds configuration flags for sandbox command. type Config struct { @@ -52,12 +24,12 @@ type Config struct { // Optionally it is possible to use local sandbox image // Flytectl will not pull the image from the registry if the local flag passes. It is usually useful while testing your local images without pushing them to a registry. - ImagePullPolicy ImagePullPolicy `json:"imagePullPolicy" pflag:",Optional. Defines the image pull behavior [Always/IfNotPresent/Never]"` + ImagePullPolicy docker.ImagePullPolicy `json:"imagePullPolicy" pflag:",Optional. Defines the image pull behavior [Always/IfNotPresent/Never]"` - ImagePullOptions ImagePullOptions `json:"imagePullOptions" pflag:",Optional. Defines image pull options (e.g. auth)"` + ImagePullOptions docker.ImagePullOptions `json:"imagePullOptions" pflag:",Optional. Defines image pull options (e.g. auth)"` } -type ImagePullOptions struct { - RegistryAuth string `json:"registryAuth" pflag:",The base64 encoded credentials for the registry."` - Platform string `json:"platform" pflag:",Forces a specific platform's image to be pulled.'"` -} +//go:generate pflags Config --default-var DefaultConfig --bind-default-var +var ( + DefaultConfig = &Config{} +) diff --git a/flytectl/cmd/demo/demo.go b/flytectl/cmd/demo/demo.go index 83cb8afe11..30b29c8040 100644 --- a/flytectl/cmd/demo/demo.go +++ b/flytectl/cmd/demo/demo.go @@ -1,7 +1,7 @@ package demo import ( - sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" + sandboxCmdConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" cmdcore "github.com/flyteorg/flytectl/cmd/core" "github.com/spf13/cobra" ) @@ -46,7 +46,7 @@ func CreateDemoCommand() *cobra.Command { demoResourcesFuncs := map[string]cmdcore.CommandEntry{ "start": {CmdFunc: startDemoCluster, Aliases: []string{}, ProjectDomainNotRequired: true, Short: startShort, - Long: startLong, PFlagProvider: sandboxConfig.DefaultConfig}, + Long: startLong, PFlagProvider: sandboxCmdConfig.DefaultConfig}, "teardown": {CmdFunc: teardownDemoCluster, Aliases: []string{}, ProjectDomainNotRequired: true, Short: teardownShort, Long: teardownLong}, diff --git a/flytectl/cmd/demo/start.go b/flytectl/cmd/demo/start.go index 4cb13d54e3..247111ffca 100644 --- a/flytectl/cmd/demo/start.go +++ b/flytectl/cmd/demo/start.go @@ -1,33 +1,12 @@ package demo import ( - "bufio" "context" - "fmt" - "io" - "os" - "path/filepath" - "time" - "github.com/flyteorg/flytectl/clierrors" - "github.com/flyteorg/flytectl/pkg/github" + "github.com/flyteorg/flytectl/pkg/sandbox" - "github.com/avast/retry-go" - "github.com/olekukonko/tablewriter" - corev1api "k8s.io/api/core/v1" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - - "github.com/docker/docker/api/types/mount" - "github.com/flyteorg/flytectl/pkg/configutil" - "github.com/flyteorg/flytectl/pkg/k8s" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/enescakir/emoji" - sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" + sandboxCmdConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" cmdCore "github.com/flyteorg/flytectl/cmd/core" - "github.com/flyteorg/flytectl/pkg/docker" - "github.com/flyteorg/flytectl/pkg/util" - "k8s.io/client-go/tools/clientcmd" ) const ( @@ -93,252 +72,11 @@ eg : for passing multiple environment variables Usage ` - k8sEndpoint = "https://127.0.0.1:30086" - flyteNamespace = "flyte" - diskPressureTaint = "node.kubernetes.io/disk-pressure" - taintEffect = "NoSchedule" - demoContextName = "flyte-sandbox" - demoDockerContext = "default" - demoImageName = "cr.flyte.org/flyteorg/flyte-sandbox-lite" + demoContextName = "flyte-sandbox" ) -type ExecResult struct { - StdOut string - StdErr string - ExitCode int -} - -func primeFlytekitPod(ctx context.Context, podService corev1.PodInterface) { - _, err := podService.Create(ctx, &corev1api.Pod{ - ObjectMeta: v1.ObjectMeta{ - Name: "py39-cacher", - }, - Spec: corev1api.PodSpec{ - RestartPolicy: corev1api.RestartPolicyNever, - Containers: []corev1api.Container{ - { - Name: "flytekit", - Image: "ghcr.io/flyteorg/flytekit:py3.9-latest", - Command: []string{"echo"}, - Args: []string{"Flyte"}, - }, - }, - }, - }, v1.CreateOptions{}) - if err != nil { - fmt.Printf("Failed to create primer pod - %s", err) - } -} - func startDemoCluster(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error { - cli, err := docker.GetDockerClient() - if err != nil { - return err - } - - ghRepo := github.GetGHRepoService() - - reader, err := startDemo(ctx, cli, ghRepo, os.Stdin) - if err != nil { - return err - } - if reader != nil { - docker.WaitForSandbox(reader, docker.SuccessMessage) - } - - if reader != nil { - var k8sClient k8s.K8s - err = retry.Do( - func() error { - k8sClient, err = k8s.GetK8sClient(docker.Kubeconfig, k8sEndpoint) - return err - }, - retry.Attempts(10), - ) - if err != nil { - return err - } - if err = updateLocalKubeContext(); err != nil { - return err - } - - if err := watchFlyteDeployment(ctx, k8sClient.CoreV1()); err != nil { - return err - } - primeFlytekitPod(ctx, k8sClient.CoreV1().Pods("default")) - util.PrintDemoMessage(util.DemoConsolePort) - } - return nil -} - -func updateLocalKubeContext() error { - srcConfigAccess := &clientcmd.PathOptions{ - GlobalFile: docker.Kubeconfig, - LoadingRules: clientcmd.NewDefaultClientConfigLoadingRules(), - } - k8sCtxMgr := k8s.NewK8sContextManager() - return k8sCtxMgr.CopyContext(srcConfigAccess, demoDockerContext, demoContextName) -} - -func startDemo(ctx context.Context, cli docker.Docker, g github.GHRepoService, reader io.Reader) (*bufio.Scanner, error) { - fmt.Printf("%v Bootstrapping a brand new flyte cluster... %v %v\n", emoji.FactoryWorker, emoji.Hammer, emoji.Wrench) - - if err := docker.RemoveSandbox(ctx, cli, reader); err != nil { - if err.Error() != clierrors.ErrSandboxExists { - return nil, err - } - fmt.Printf("Existing details of your demo cluster") - util.PrintDemoMessage(util.DemoConsolePort) - return nil, nil - } - - if err := util.SetupFlyteDir(); err != nil { - return nil, err - } - - templateValues := configutil.ConfigTemplateSpec{ - Host: "localhost:30081", - Insecure: true, - } - if err := configutil.SetupConfig(configutil.ConfigFile, configutil.GetTemplate(), templateValues); err != nil { - return nil, err - } - - volumes := docker.Volumes - sandboxDefaultConfig := sandboxConfig.DefaultConfig - if vol, err := mountVolume(sandboxDefaultConfig.Source, docker.Source); err != nil { - return nil, err - } else if vol != nil { - volumes = append(volumes, *vol) - } - demoImage := sandboxConfig.DefaultConfig.Image - if len(demoImage) == 0 { - image, version, err := github.GetFullyQualifiedImageName("sha", sandboxConfig.DefaultConfig.Version, demoImageName, sandboxConfig.DefaultConfig.Prerelease, g) - if err != nil { - return nil, err - } - demoImage = image - fmt.Printf("%v Running Flyte %s release\n", emoji.Whale, version) - } - fmt.Printf("%v pulling docker image for release %s\n", emoji.Whale, demoImage) - if err := docker.PullDockerImage(ctx, cli, demoImage, sandboxConfig.DefaultConfig.ImagePullPolicy, sandboxConfig.DefaultConfig.ImagePullOptions); err != nil { - return nil, err - } - - fmt.Printf("%v booting flyte-demo container\n", emoji.FactoryWorker) - exposedPorts, portBindings, _ := docker.GetDemoPorts() - ID, err := docker.StartContainer(ctx, cli, volumes, exposedPorts, portBindings, docker.FlyteSandboxClusterName, - demoImage, sandboxDefaultConfig.Env) - - if err != nil { - fmt.Printf("%v Something went wrong: Failed to start demo container %v, Please check your docker client and try again. \n", emoji.GrimacingFace, emoji.Whale) - return nil, err - } - - logReader, err := docker.ReadLogs(ctx, cli, ID) - if err != nil { - return nil, err - } - - return logReader, nil -} - -func mountVolume(file, destination string) (*mount.Mount, error) { - if len(file) > 0 { - source, err := filepath.Abs(file) - if err != nil { - return nil, err - } - return &mount.Mount{ - Type: mount.TypeBind, - Source: source, - Target: destination, - }, nil - } - return nil, nil -} - -func watchFlyteDeployment(ctx context.Context, appsClient corev1.CoreV1Interface) error { - var data = os.Stdout - table := tablewriter.NewWriter(data) - table.SetHeader([]string{"Service", "Status", "Namespace"}) - table.SetRowLine(true) - - for { - isTaint, err := isNodeTainted(ctx, appsClient) - if err != nil { - return err - } - if isTaint { - return fmt.Errorf("docker sandbox doesn't have sufficient memory available. Please run docker system prune -a --volumes") - } - - pods, err := getFlyteDeployment(ctx, appsClient) - if err != nil { - return err - } - table.ClearRows() - table.SetAutoWrapText(false) - table.SetAutoFormatHeaders(true) - - // Clear os.Stdout - _, _ = data.WriteString("\x1b[3;J\x1b[H\x1b[2J") - - var total, ready int - total = len(pods.Items) - ready = 0 - if total > 0 { - for _, v := range pods.Items { - if isPodReady(v) { - ready++ - } - if len(v.Status.Conditions) > 0 { - table.Append([]string{v.GetName(), string(v.Status.Phase), v.GetNamespace()}) - } - } - table.Render() - if total == ready { - return nil - } - } else { - table.Append([]string{"k8s: This might take a little bit", "Bootstrapping", ""}) - table.Render() - } - - time.Sleep(10 * time.Second) - } -} - -func isPodReady(v corev1api.Pod) bool { - if (v.Status.Phase == corev1api.PodRunning) || (v.Status.Phase == corev1api.PodSucceeded) { - return true - } - return false -} - -func getFlyteDeployment(ctx context.Context, client corev1.CoreV1Interface) (*corev1api.PodList, error) { - pods, err := client.Pods(flyteNamespace).List(ctx, v1.ListOptions{}) - if err != nil { - return nil, err - } - return pods, nil -} + sandboxDefaultConfig := sandboxCmdConfig.DefaultConfig + return sandbox.StartDemoCluster(ctx, args, sandboxDefaultConfig) -func isNodeTainted(ctx context.Context, client corev1.CoreV1Interface) (bool, error) { - nodes, err := client.Nodes().List(ctx, v1.ListOptions{}) - if err != nil { - return false, err - } - match := 0 - for _, node := range nodes.Items { - for _, c := range node.Spec.Taints { - if c.Key == diskPressureTaint && c.Effect == taintEffect { - match++ - } - } - } - if match > 0 { - return true, nil - } - return false, nil } diff --git a/flytectl/cmd/demo/start_test.go b/flytectl/cmd/demo/start_test.go index 11ebacc796..bed5a16777 100644 --- a/flytectl/cmd/demo/start_test.go +++ b/flytectl/cmd/demo/start_test.go @@ -1,426 +1 @@ package demo - -import ( - "context" - "fmt" - "io" - "io/ioutil" - "os" - "strings" - "testing" - - sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" - cmdCore "github.com/flyteorg/flytectl/cmd/core" - "github.com/flyteorg/flytectl/pkg/docker" - "github.com/flyteorg/flytectl/pkg/docker/mocks" - f "github.com/flyteorg/flytectl/pkg/filesystemutils" - ghMocks "github.com/flyteorg/flytectl/pkg/github/mocks" - "github.com/flyteorg/flytectl/pkg/k8s" - k8sMocks "github.com/flyteorg/flytectl/pkg/k8s/mocks" - "github.com/flyteorg/flytectl/pkg/util" - "github.com/flyteorg/flyteidl/clients/go/admin" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/google/go-github/v42/github" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - testclient "k8s.io/client-go/kubernetes/fake" -) - -var content = ` -apiVersion: v1 -clusters: -- cluster: - server: https://localhost:8080 - extensions: - - name: client.authentication.k8s.io/exec - extension: - audience: foo - other: bar - name: default -contexts: -- context: - cluster: default - user: default - namespace: bar - name: default -current-context: default -kind: Config -users: -- name: default - user: - exec: - apiVersion: client.authentication.k8s.io/v1alpha1 - args: - - arg-1 - - arg-2 - command: foo-command - provideClusterInfo: true -` - -var ( - githubMock *ghMocks.GHRepoService - ctx context.Context - mockDocker *mocks.Docker -) - -var fakeNode = &corev1.Node{ - Spec: corev1.NodeSpec{ - Taints: []corev1.Taint{}, - }, -} - -var fakePod = corev1.Pod{ - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - Conditions: []corev1.PodCondition{}, - }, -} - -func demoSetup() { - ctx = context.Background() - mockDocker = &mocks.Docker{} - errCh := make(chan error) - sandboxConfig.DefaultConfig.Version = "v0.19.1" - bodyStatus := make(chan container.ContainerWaitOKBody) - githubMock = &ghMocks.GHRepoService{} - sandboxConfig.DefaultConfig.Image = "dummyimage" - mockDocker.OnContainerCreateMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(container.ContainerCreateCreatedBody{ - ID: "Hello", - }, nil) - - mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) -} - -func TestStartDemoFunc(t *testing.T) { - assert.Nil(t, util.SetupFlyteDir()) - assert.Nil(t, os.MkdirAll(f.FilePathJoin(f.UserHomeDir(), ".flyte", "k3s"), os.ModePerm)) - assert.Nil(t, ioutil.WriteFile(docker.Kubeconfig, []byte(content), os.ModePerm)) - - fakePod.SetName("flyte") - - t.Run("Successfully run demo cluster", func(t *testing.T) { - demoSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - _, err := startDemo(ctx, mockDocker, githubMock, os.Stdin) - assert.Nil(t, err) - }) - t.Run("Successfully exit when demo cluster exist", func(t *testing.T) { - demoSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{ - { - ID: docker.FlyteSandboxClusterName, - Names: []string{ - docker.FlyteSandboxClusterName, - }, - }, - }, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - reader, err := startDemo(ctx, mockDocker, githubMock, strings.NewReader("n")) - assert.Nil(t, err) - assert.Nil(t, reader) - }) - t.Run("Successfully run demo cluster with source code", func(t *testing.T) { - sandboxConfig.DefaultConfig.Source = f.UserHomeDir() - sandboxConfig.DefaultConfig.Version = "" - demoSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - _, err := startDemo(ctx, mockDocker, githubMock, os.Stdin) - assert.Nil(t, err) - }) - t.Run("Successfully run demo cluster with abs path of source code", func(t *testing.T) { - sandboxConfig.DefaultConfig.Source = "../" - sandboxConfig.DefaultConfig.Version = "" - demoSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - _, err := startDemo(ctx, mockDocker, githubMock, os.Stdin) - assert.Nil(t, err) - }) - t.Run("Successfully run demo cluster with specific version", func(t *testing.T) { - demoSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - sandboxConfig.DefaultConfig.Image = "" - tag := "v0.15.0" - githubMock.OnGetReleaseByTagMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&github.RepositoryRelease{ - TagName: &tag, - }, nil, nil) - - githubMock.OnGetCommitSHA1Match(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("dummySha", nil, nil) - _, err := startDemo(ctx, mockDocker, githubMock, os.Stdin) - assert.Nil(t, err) - }) - t.Run("Failed run demo cluster with wrong version", func(t *testing.T) { - demoSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - sandboxConfig.DefaultConfig.Image = "" - githubMock.OnGetReleaseByTagMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("non-existent-tag")) - _, err := startDemo(ctx, mockDocker, githubMock, os.Stdin) - assert.NotNil(t, err) - assert.Equal(t, "non-existent-tag", err.Error()) - }) - t.Run("Error in pulling image", func(t *testing.T) { - demoSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, fmt.Errorf("failed to pull")) - sandboxConfig.DefaultConfig.Image = "" - tag := "v0.15.0" - githubMock.OnGetReleaseByTagMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&github.RepositoryRelease{ - TagName: &tag, - }, nil, nil) - - githubMock.OnGetCommitSHA1Match(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("dummySha", nil, nil) - _, err := startDemo(ctx, mockDocker, githubMock, os.Stdin) - assert.NotNil(t, err) - assert.Equal(t, "failed to pull", err.Error()) - }) - t.Run("Error in removing existing cluster", func(t *testing.T) { - demoSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{ - { - ID: docker.FlyteSandboxClusterName, - Names: []string{ - docker.FlyteSandboxClusterName, - }, - }, - }, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerRemove(ctx, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(fmt.Errorf("failed to remove container")) - _, err := startDemo(ctx, mockDocker, githubMock, strings.NewReader("y")) - assert.NotNil(t, err) - assert.Equal(t, "failed to remove container", err.Error()) - }) - t.Run("Error in start container", func(t *testing.T) { - demoSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(fmt.Errorf("failed to run container")) - _, err := startDemo(ctx, mockDocker, githubMock, os.Stdin) - assert.NotNil(t, err) - assert.Equal(t, "failed to run container", err.Error()) - }) - t.Run("Error in reading logs", func(t *testing.T) { - demoSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, fmt.Errorf("failed to get container logs")) - _, err := startDemo(ctx, mockDocker, githubMock, os.Stdin) - assert.NotNil(t, err) - assert.Equal(t, "failed to get container logs", err.Error()) - }) - t.Run("Error in list container", func(t *testing.T) { - demoSetup() - mockDocker.OnContainerListMatch(mock.Anything, mock.Anything).Return([]types.Container{}, fmt.Errorf("failed to list containers")) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - _, err := startDemo(ctx, mockDocker, githubMock, os.Stdin) - assert.NotNil(t, err) - assert.Equal(t, "failed to list containers", err.Error()) - }) - t.Run("Successfully run demo cluster command", func(t *testing.T) { - mockOutStream := new(io.Writer) - cmdCtx := cmdCore.NewCommandContext(admin.InitializeMockClientset(), *mockOutStream) - client := testclient.NewSimpleClientset() - k8s.Client = client - _, err := client.CoreV1().Pods("flyte").Create(ctx, &fakePod, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - fakeNode.SetName("master") - _, err = client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - demoSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(mock.Anything, mock.Anything, mock.Anything).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - - stringReader := strings.NewReader(docker.SuccessMessage) - reader := ioutil.NopCloser(stringReader) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(reader, nil) - mockK8sContextMgr := &k8sMocks.ContextOps{} - docker.Client = mockDocker - sandboxConfig.DefaultConfig.Source = "" - sandboxConfig.DefaultConfig.Version = "" - k8s.ContextMgr = mockK8sContextMgr - mockK8sContextMgr.OnCopyContextMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) - err = startDemoCluster(ctx, []string{}, cmdCtx) - assert.Nil(t, err) - }) - t.Run("Error in running demo cluster command", func(t *testing.T) { - mockOutStream := new(io.Writer) - cmdCtx := cmdCore.NewCommandContext(admin.InitializeMockClientset(), *mockOutStream) - demoSetup() - docker.Client = mockDocker - mockDocker.OnContainerListMatch(mock.Anything, mock.Anything).Return([]types.Container{}, fmt.Errorf("failed to list containers")) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - err := startDemoCluster(ctx, []string{}, cmdCtx) - assert.NotNil(t, err) - }) -} - -func TestMonitorFlyteDeployment(t *testing.T) { - t.Run("Monitor k8s deployment fail because of storage", func(t *testing.T) { - ctx := context.Background() - client := testclient.NewSimpleClientset() - k8s.Client = client - fakePod.SetName("flyte") - fakePod.SetName("flyte") - - _, err := client.CoreV1().Pods("flyte").Create(ctx, &fakePod, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - fakeNode.SetName("master") - fakeNode.Spec.Taints = append(fakeNode.Spec.Taints, corev1.Taint{ - Effect: "NoSchedule", - Key: "node.kubernetes.io/disk-pressure", - }) - _, err = client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - - err = watchFlyteDeployment(ctx, client.CoreV1()) - assert.NotNil(t, err) - - }) - - t.Run("Monitor k8s deployment success", func(t *testing.T) { - ctx := context.Background() - client := testclient.NewSimpleClientset() - k8s.Client = client - fakePod.SetName("flyte") - fakePod.SetName("flyte") - - _, err := client.CoreV1().Pods("flyte").Create(ctx, &fakePod, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - fakeNode.SetName("master") - fakeNode.Spec.Taints = []corev1.Taint{} - _, err = client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - - err = watchFlyteDeployment(ctx, client.CoreV1()) - assert.Nil(t, err) - - }) - -} - -func TestGetFlyteDeploymentCount(t *testing.T) { - - ctx := context.Background() - client := testclient.NewSimpleClientset() - c, err := getFlyteDeployment(ctx, client.CoreV1()) - assert.Nil(t, err) - assert.Equal(t, 0, len(c.Items)) -} - -func TestGetNodeTaintStatus(t *testing.T) { - t.Run("Check node taint with success", func(t *testing.T) { - ctx := context.Background() - client := testclient.NewSimpleClientset() - fakeNode.SetName("master") - _, err := client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - c, err := isNodeTainted(ctx, client.CoreV1()) - assert.Nil(t, err) - assert.Equal(t, false, c) - }) - t.Run("Check node taint with fail", func(t *testing.T) { - ctx := context.Background() - client := testclient.NewSimpleClientset() - fakeNode.SetName("master") - _, err := client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - node, err := client.CoreV1().Nodes().Get(ctx, "master", v1.GetOptions{}) - if err != nil { - t.Error(err) - } - node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{ - Effect: taintEffect, - Key: diskPressureTaint, - }) - _, err = client.CoreV1().Nodes().Update(ctx, node, v1.UpdateOptions{}) - if err != nil { - t.Error(err) - } - c, err := isNodeTainted(ctx, client.CoreV1()) - assert.Nil(t, err) - assert.Equal(t, true, c) - }) -} diff --git a/flytectl/cmd/sandbox/sandbox.go b/flytectl/cmd/sandbox/sandbox.go index 26e9453f7e..2dc5ab95a4 100644 --- a/flytectl/cmd/sandbox/sandbox.go +++ b/flytectl/cmd/sandbox/sandbox.go @@ -1,7 +1,7 @@ package sandbox import ( - sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" + sandboxCmdConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" cmdcore "github.com/flyteorg/flytectl/cmd/core" "github.com/spf13/cobra" ) @@ -46,7 +46,7 @@ func CreateSandboxCommand() *cobra.Command { sandboxResourcesFuncs := map[string]cmdcore.CommandEntry{ "start": {CmdFunc: startSandboxCluster, Aliases: []string{}, ProjectDomainNotRequired: true, Short: startShort, - Long: startLong, PFlagProvider: sandboxConfig.DefaultConfig}, + Long: startLong, PFlagProvider: sandboxCmdConfig.DefaultConfig}, "teardown": {CmdFunc: teardownSandboxCluster, Aliases: []string{}, ProjectDomainNotRequired: true, Short: teardownShort, Long: teardownLong}, diff --git a/flytectl/cmd/sandbox/start.go b/flytectl/cmd/sandbox/start.go index af09b1f64d..d409637032 100644 --- a/flytectl/cmd/sandbox/start.go +++ b/flytectl/cmd/sandbox/start.go @@ -1,33 +1,11 @@ package sandbox import ( - "bufio" "context" - "fmt" - "io" - "os" - "path/filepath" - "time" - "github.com/flyteorg/flytectl/clierrors" - "github.com/flyteorg/flytectl/pkg/github" - - "github.com/avast/retry-go" - "github.com/olekukonko/tablewriter" - corev1api "k8s.io/api/core/v1" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - - "github.com/docker/docker/api/types/mount" - "github.com/flyteorg/flytectl/pkg/configutil" - "github.com/flyteorg/flytectl/pkg/k8s" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/enescakir/emoji" - sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" + sandboxCmdConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" cmdCore "github.com/flyteorg/flytectl/cmd/core" - "github.com/flyteorg/flytectl/pkg/docker" - "github.com/flyteorg/flytectl/pkg/util" - "k8s.io/client-go/tools/clientcmd" + "github.com/flyteorg/flytectl/pkg/sandbox" ) const ( @@ -98,228 +76,10 @@ eg : for passing multiple environment variables Usage ` - k8sEndpoint = "https://127.0.0.1:30086" - flyteNamespace = "flyte" - diskPressureTaint = "node.kubernetes.io/disk-pressure" - taintEffect = "NoSchedule" - sandboxContextName = "flyte-sandbox" - sandboxDockerContext = "default" - sandboxImageName = "cr.flyte.org/flyteorg/flyte-sandbox" + sandboxContextName = "flyte-sandbox" ) -type ExecResult struct { - StdOut string - StdErr string - ExitCode int -} - func startSandboxCluster(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error { - cli, err := docker.GetDockerClient() - if err != nil { - return err - } - - ghRepo := github.GetGHRepoService() - - reader, err := startSandbox(ctx, cli, ghRepo, os.Stdin) - if err != nil { - return err - } - if reader != nil { - docker.WaitForSandbox(reader, docker.SuccessMessage) - } - - if reader != nil { - var k8sClient k8s.K8s - err = retry.Do( - func() error { - k8sClient, err = k8s.GetK8sClient(docker.Kubeconfig, k8sEndpoint) - return err - }, - retry.Attempts(10), - ) - if err != nil { - return err - } - if err = updateLocalKubeContext(); err != nil { - return err - } - - if err := watchFlyteDeployment(ctx, k8sClient.CoreV1()); err != nil { - return err - } - util.PrintSandboxMessage(util.SandBoxConsolePort) - } - return nil -} - -func updateLocalKubeContext() error { - srcConfigAccess := &clientcmd.PathOptions{ - GlobalFile: docker.Kubeconfig, - LoadingRules: clientcmd.NewDefaultClientConfigLoadingRules(), - } - k8sCtxMgr := k8s.NewK8sContextManager() - return k8sCtxMgr.CopyContext(srcConfigAccess, sandboxDockerContext, sandboxContextName) -} - -func startSandbox(ctx context.Context, cli docker.Docker, g github.GHRepoService, reader io.Reader) (*bufio.Scanner, error) { - fmt.Printf("%v Bootstrapping a brand new flyte cluster... %v %v\n", emoji.FactoryWorker, emoji.Hammer, emoji.Wrench) - - if err := docker.RemoveSandbox(ctx, cli, reader); err != nil { - if err.Error() != clierrors.ErrSandboxExists { - return nil, err - } - fmt.Printf("Existing details of your sandbox") - util.PrintSandboxMessage(util.SandBoxConsolePort) - return nil, nil - } - - if err := util.SetupFlyteDir(); err != nil { - return nil, err - } - - templateValues := configutil.ConfigTemplateSpec{ - Host: "localhost:30081", - Insecure: true, - } - if err := configutil.SetupConfig(configutil.FlytectlConfig, configutil.GetTemplate(), templateValues); err != nil { - return nil, err - } - - volumes := docker.Volumes - sandboxDefaultConfig := sandboxConfig.DefaultConfig - if vol, err := mountVolume(sandboxDefaultConfig.Source, docker.Source); err != nil { - return nil, err - } else if vol != nil { - volumes = append(volumes, *vol) - } - sandboxImage := sandboxConfig.DefaultConfig.Image - if len(sandboxImage) == 0 { - image, version, err := github.GetFullyQualifiedImageName("dind", sandboxConfig.DefaultConfig.Version, sandboxImageName, sandboxConfig.DefaultConfig.Prerelease, g) - if err != nil { - return nil, err - } - sandboxImage = image - fmt.Printf("%v Running Flyte %s release\n", emoji.Whale, version) - } - fmt.Printf("%v pulling docker image for release %s\n", emoji.Whale, sandboxImage) - if err := docker.PullDockerImage(ctx, cli, sandboxImage, sandboxConfig.DefaultConfig.ImagePullPolicy, sandboxConfig.DefaultConfig.ImagePullOptions); err != nil { - return nil, err - } - - fmt.Printf("%v booting Flyte-sandbox container\n", emoji.FactoryWorker) - exposedPorts, portBindings, _ := docker.GetSandboxPorts() - ID, err := docker.StartContainer(ctx, cli, volumes, exposedPorts, portBindings, docker.FlyteSandboxClusterName, - sandboxImage, sandboxDefaultConfig.Env) - - if err != nil { - fmt.Printf("%v Something went wrong: Failed to start Sandbox container %v, Please check your docker client and try again. \n", emoji.GrimacingFace, emoji.Whale) - return nil, err - } - - logReader, err := docker.ReadLogs(ctx, cli, ID) - if err != nil { - return nil, err - } - - return logReader, nil -} - -func mountVolume(file, destination string) (*mount.Mount, error) { - if len(file) > 0 { - source, err := filepath.Abs(file) - if err != nil { - return nil, err - } - return &mount.Mount{ - Type: mount.TypeBind, - Source: source, - Target: destination, - }, nil - } - return nil, nil -} - -func watchFlyteDeployment(ctx context.Context, appsClient corev1.CoreV1Interface) error { - var data = os.Stdout - table := tablewriter.NewWriter(data) - table.SetHeader([]string{"Service", "Status", "Namespace"}) - table.SetRowLine(true) - - for { - isTaint, err := isNodeTainted(ctx, appsClient) - if err != nil { - return err - } - if isTaint { - return fmt.Errorf("docker sandbox doesn't have sufficient memory available. Please run docker system prune -a --volumes") - } - - pods, err := getFlyteDeployment(ctx, appsClient) - if err != nil { - return err - } - table.ClearRows() - table.SetAutoWrapText(false) - table.SetAutoFormatHeaders(true) - - // Clear os.Stdout - _, _ = data.WriteString("\x1b[3;J\x1b[H\x1b[2J") - - var total, ready int - total = len(pods.Items) - ready = 0 - if total != 0 { - for _, v := range pods.Items { - if isPodReady(v) { - ready++ - } - if len(v.Status.Conditions) > 0 { - table.Append([]string{v.GetName(), string(v.Status.Phase), v.GetNamespace()}) - } - } - table.Render() - if total == ready { - break - } - } - - time.Sleep(40 * time.Second) - } - - return nil -} - -func isPodReady(v corev1api.Pod) bool { - if (v.Status.Phase == corev1api.PodRunning) || (v.Status.Phase == corev1api.PodSucceeded) { - return true - } - return false -} - -func getFlyteDeployment(ctx context.Context, client corev1.CoreV1Interface) (*corev1api.PodList, error) { - pods, err := client.Pods(flyteNamespace).List(ctx, v1.ListOptions{}) - if err != nil { - return nil, err - } - return pods, nil -} - -func isNodeTainted(ctx context.Context, client corev1.CoreV1Interface) (bool, error) { - nodes, err := client.Nodes().List(ctx, v1.ListOptions{}) - if err != nil { - return false, err - } - match := 0 - for _, node := range nodes.Items { - for _, c := range node.Spec.Taints { - if c.Key == diskPressureTaint && c.Effect == taintEffect { - match++ - } - } - } - if match > 0 { - return true, nil - } - return false, nil + sandboxDefaultConfig := sandboxCmdConfig.DefaultConfig + return sandbox.StartSandboxCluster(ctx, args, sandboxDefaultConfig) } diff --git a/flytectl/cmd/sandbox/start_test.go b/flytectl/cmd/sandbox/start_test.go index 8e1f28c5c8..3bee1abdbc 100644 --- a/flytectl/cmd/sandbox/start_test.go +++ b/flytectl/cmd/sandbox/start_test.go @@ -1,428 +1 @@ package sandbox - -import ( - "context" - "fmt" - "io" - "io/ioutil" - "os" - "strings" - "testing" - - sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" - cmdCore "github.com/flyteorg/flytectl/cmd/core" - "github.com/flyteorg/flytectl/pkg/docker" - "github.com/flyteorg/flytectl/pkg/docker/mocks" - f "github.com/flyteorg/flytectl/pkg/filesystemutils" - ghutil "github.com/flyteorg/flytectl/pkg/github" - ghMocks "github.com/flyteorg/flytectl/pkg/github/mocks" - "github.com/flyteorg/flytectl/pkg/k8s" - k8sMocks "github.com/flyteorg/flytectl/pkg/k8s/mocks" - "github.com/flyteorg/flytectl/pkg/util" - "github.com/flyteorg/flyteidl/clients/go/admin" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/google/go-github/v42/github" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - testclient "k8s.io/client-go/kubernetes/fake" -) - -var content = ` -apiVersion: v1 -clusters: -- cluster: - server: https://localhost:8080 - extensions: - - name: client.authentication.k8s.io/exec - extension: - audience: foo - other: bar - name: default -contexts: -- context: - cluster: default - user: default - namespace: bar - name: default -current-context: default -kind: Config -users: -- name: default - user: - exec: - apiVersion: client.authentication.k8s.io/v1alpha1 - args: - - arg-1 - - arg-2 - command: foo-command - provideClusterInfo: true -` - -var fakeNode = &corev1.Node{ - Spec: corev1.NodeSpec{ - Taints: []corev1.Taint{}, - }, -} - -var fakePod = corev1.Pod{ - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - Conditions: []corev1.PodCondition{}, - }, -} - -var ( - githubMock *ghMocks.GHRepoService - ctx context.Context - mockDocker *mocks.Docker -) - -func sandboxSetup() { - ctx = context.Background() - mockDocker = &mocks.Docker{} - errCh := make(chan error) - sandboxConfig.DefaultConfig.Version = "v0.19.1" - bodyStatus := make(chan container.ContainerWaitOKBody) - githubMock = &ghMocks.GHRepoService{} - sandboxConfig.DefaultConfig.Image = "dummyimage" - mockDocker.OnContainerCreateMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(container.ContainerCreateCreatedBody{ - ID: "Hello", - }, nil) - - mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) -} - -func TestStartFunc(t *testing.T) { - assert.Nil(t, util.SetupFlyteDir()) - assert.Nil(t, os.MkdirAll(f.FilePathJoin(f.UserHomeDir(), ".flyte", "k3s"), os.ModePerm)) - assert.Nil(t, ioutil.WriteFile(docker.Kubeconfig, []byte(content), os.ModePerm)) - - fakePod.SetName("flyte") - - t.Run("Successfully run demo cluster", func(t *testing.T) { - sandboxSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin) - assert.Nil(t, err) - }) - t.Run("Successfully exit when demo cluster exist", func(t *testing.T) { - sandboxSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{ - { - ID: docker.FlyteSandboxClusterName, - Names: []string{ - docker.FlyteSandboxClusterName, - }, - }, - }, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - reader, err := startSandbox(ctx, mockDocker, githubMock, strings.NewReader("n")) - assert.Nil(t, err) - assert.Nil(t, reader) - }) - t.Run("Successfully run demo cluster with source code", func(t *testing.T) { - sandboxConfig.DefaultConfig.Source = f.UserHomeDir() - sandboxConfig.DefaultConfig.Version = "" - sandboxSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin) - assert.Nil(t, err) - }) - t.Run("Successfully run demo cluster with abs path of source code", func(t *testing.T) { - sandboxConfig.DefaultConfig.Source = "../" - sandboxConfig.DefaultConfig.Version = "" - sandboxSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin) - assert.Nil(t, err) - }) - t.Run("Successfully run demo cluster with specific version", func(t *testing.T) { - sandboxSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - sandboxConfig.DefaultConfig.Image = "" - tag := "v0.15.0" - githubMock.OnGetReleaseByTagMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&github.RepositoryRelease{ - TagName: &tag, - }, nil, nil) - - githubMock.OnGetCommitSHA1Match(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("dummySha", nil, nil) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin) - assert.Nil(t, err) - }) - t.Run("Failed run demo cluster with wrong version", func(t *testing.T) { - sandboxSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - sandboxConfig.DefaultConfig.Image = "" - githubMock.OnGetReleaseByTagMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("non-existent-tag")) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin) - assert.NotNil(t, err) - assert.Equal(t, "non-existent-tag", err.Error()) - }) - t.Run("Error in pulling image", func(t *testing.T) { - sandboxSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, fmt.Errorf("failed to pull")) - sandboxConfig.DefaultConfig.Image = "" - tag := "v0.15.0" - githubMock.OnGetReleaseByTagMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&github.RepositoryRelease{ - TagName: &tag, - }, nil, nil) - - githubMock.OnGetCommitSHA1Match(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("dummySha", nil, nil) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin) - assert.NotNil(t, err) - assert.Equal(t, "failed to pull", err.Error()) - }) - t.Run("Error in removing existing cluster", func(t *testing.T) { - sandboxSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{ - { - ID: docker.FlyteSandboxClusterName, - Names: []string{ - docker.FlyteSandboxClusterName, - }, - }, - }, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerRemove(ctx, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(fmt.Errorf("failed to remove container")) - _, err := startSandbox(ctx, mockDocker, githubMock, strings.NewReader("y")) - assert.NotNil(t, err) - assert.Equal(t, "failed to remove container", err.Error()) - }) - t.Run("Error in start container", func(t *testing.T) { - sandboxSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(fmt.Errorf("failed to run container")) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin) - assert.NotNil(t, err) - assert.Equal(t, "failed to run container", err.Error()) - }) - t.Run("Error in reading logs", func(t *testing.T) { - sandboxSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, fmt.Errorf("failed to get container logs")) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin) - assert.NotNil(t, err) - assert.Equal(t, "failed to get container logs", err.Error()) - }) - t.Run("Error in list container", func(t *testing.T) { - sandboxSetup() - mockDocker.OnContainerListMatch(mock.Anything, mock.Anything).Return([]types.Container{}, fmt.Errorf("failed to list containers")) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin) - assert.NotNil(t, err) - assert.Equal(t, "failed to list containers", err.Error()) - }) - t.Run("Successfully run demo cluster command", func(t *testing.T) { - mockOutStream := new(io.Writer) - cmdCtx := cmdCore.NewCommandContext(admin.InitializeMockClientset(), *mockOutStream) - client := testclient.NewSimpleClientset() - k8s.Client = client - _, err := client.CoreV1().Pods("flyte").Create(ctx, &fakePod, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - fakeNode.SetName("master") - _, err = client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - sandboxSetup() - mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(mock.Anything, mock.Anything, mock.Anything).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - - stringReader := strings.NewReader(docker.SuccessMessage) - reader := ioutil.NopCloser(stringReader) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(reader, nil) - mockK8sContextMgr := &k8sMocks.ContextOps{} - docker.Client = mockDocker - sandboxConfig.DefaultConfig.Source = "" - sandboxConfig.DefaultConfig.Version = "" - k8s.ContextMgr = mockK8sContextMgr - ghutil.Client = githubMock - mockK8sContextMgr.OnCopyContextMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) - err = startSandboxCluster(ctx, []string{}, cmdCtx) - assert.Nil(t, err) - }) - t.Run("Error in running demo cluster command", func(t *testing.T) { - mockOutStream := new(io.Writer) - cmdCtx := cmdCore.NewCommandContext(admin.InitializeMockClientset(), *mockOutStream) - sandboxSetup() - docker.Client = mockDocker - mockDocker.OnContainerListMatch(mock.Anything, mock.Anything).Return([]types.Container{}, fmt.Errorf("failed to list containers")) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }).Return(nil, nil) - err := startSandboxCluster(ctx, []string{}, cmdCtx) - assert.NotNil(t, err) - }) -} - -func TestMonitorFlyteDeployment(t *testing.T) { - t.Run("Monitor k8s deployment fail because of storage", func(t *testing.T) { - ctx := context.Background() - client := testclient.NewSimpleClientset() - k8s.Client = client - fakePod.SetName("flyte") - fakePod.SetName("flyte") - - _, err := client.CoreV1().Pods("flyte").Create(ctx, &fakePod, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - fakeNode.SetName("master") - fakeNode.Spec.Taints = append(fakeNode.Spec.Taints, corev1.Taint{ - Effect: "NoSchedule", - Key: "node.kubernetes.io/disk-pressure", - }) - _, err = client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - - err = watchFlyteDeployment(ctx, client.CoreV1()) - assert.NotNil(t, err) - - }) - - t.Run("Monitor k8s deployment success", func(t *testing.T) { - ctx := context.Background() - client := testclient.NewSimpleClientset() - k8s.Client = client - fakePod.SetName("flyte") - fakePod.SetName("flyte") - - _, err := client.CoreV1().Pods("flyte").Create(ctx, &fakePod, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - fakeNode.SetName("master") - fakeNode.Spec.Taints = []corev1.Taint{} - _, err = client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - - err = watchFlyteDeployment(ctx, client.CoreV1()) - assert.Nil(t, err) - - }) - -} - -func TestGetFlyteDeploymentCount(t *testing.T) { - - ctx := context.Background() - client := testclient.NewSimpleClientset() - c, err := getFlyteDeployment(ctx, client.CoreV1()) - assert.Nil(t, err) - assert.Equal(t, 0, len(c.Items)) -} - -func TestGetNodeTaintStatus(t *testing.T) { - t.Run("Check node taint with success", func(t *testing.T) { - ctx := context.Background() - client := testclient.NewSimpleClientset() - fakeNode.SetName("master") - _, err := client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - c, err := isNodeTainted(ctx, client.CoreV1()) - assert.Nil(t, err) - assert.Equal(t, false, c) - }) - t.Run("Check node taint with fail", func(t *testing.T) { - ctx := context.Background() - client := testclient.NewSimpleClientset() - fakeNode.SetName("master") - _, err := client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) - if err != nil { - t.Error(err) - } - node, err := client.CoreV1().Nodes().Get(ctx, "master", v1.GetOptions{}) - if err != nil { - t.Error(err) - } - node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{ - Effect: taintEffect, - Key: diskPressureTaint, - }) - _, err = client.CoreV1().Nodes().Update(ctx, node, v1.UpdateOptions{}) - if err != nil { - t.Error(err) - } - c, err := isNodeTainted(ctx, client.CoreV1()) - assert.Nil(t, err) - assert.Equal(t, true, c) - }) -} diff --git a/flytectl/pkg/docker/docker.go b/flytectl/pkg/docker/docker.go index cb08092b5f..84f9fb3365 100644 --- a/flytectl/pkg/docker/docker.go +++ b/flytectl/pkg/docker/docker.go @@ -30,3 +30,33 @@ type Docker interface { type FlyteDocker struct { *client.Client } + +//go:generate enumer -type=ImagePullPolicy -trimprefix=ImagePullPolicy --json +type ImagePullPolicy int + +const ( + ImagePullPolicyAlways ImagePullPolicy = iota + ImagePullPolicyIfNotPresent + ImagePullPolicyNever +) + +// Set implements PFlag's Value interface to attempt to set the value of the flag from string. +func (i *ImagePullPolicy) Set(val string) error { + policy, err := ImagePullPolicyString(val) + if err != nil { + return err + } + + *i = policy + return nil +} + +// Type implements PFlag's Value interface to return type name. +func (i ImagePullPolicy) Type() string { + return "ImagePullPolicy" +} + +type ImagePullOptions struct { + RegistryAuth string `json:"registryAuth" pflag:",The base64 encoded credentials for the registry."` + Platform string `json:"platform" pflag:",Forces a specific platform's image to be pulled.'"` +} diff --git a/flytectl/pkg/docker/docker_util.go b/flytectl/pkg/docker/docker_util.go index 8c9d47348f..de7adda9b9 100644 --- a/flytectl/pkg/docker/docker_util.go +++ b/flytectl/pkg/docker/docker_util.go @@ -12,8 +12,6 @@ import ( "github.com/docker/docker/client" "github.com/enescakir/emoji" - sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" - "github.com/flyteorg/flytectl/clierrors" "github.com/docker/docker/api/types" @@ -128,11 +126,11 @@ func GetDemoPorts() (map[nat.Port]struct{}, map[nat.Port][]nat.PortBinding, erro } // PullDockerImage will Pull docker image -func PullDockerImage(ctx context.Context, cli Docker, image string, pullPolicy sandboxConfig.ImagePullPolicy, - imagePullOptions sandboxConfig.ImagePullOptions) error { +func PullDockerImage(ctx context.Context, cli Docker, image string, pullPolicy ImagePullPolicy, + imagePullOptions ImagePullOptions) error { - if pullPolicy == sandboxConfig.ImagePullPolicyAlways || pullPolicy == sandboxConfig.ImagePullPolicyIfNotPresent { - if pullPolicy == sandboxConfig.ImagePullPolicyIfNotPresent { + if pullPolicy == ImagePullPolicyAlways || pullPolicy == ImagePullPolicyIfNotPresent { + if pullPolicy == ImagePullPolicyIfNotPresent { imageSummary, err := cli.ImageList(ctx, types.ImageListOptions{}) if err != nil { return err diff --git a/flytectl/pkg/docker/docker_util_test.go b/flytectl/pkg/docker/docker_util_test.go index 37d91f9085..8524443641 100644 --- a/flytectl/pkg/docker/docker_util_test.go +++ b/flytectl/pkg/docker/docker_util_test.go @@ -8,8 +8,6 @@ import ( "strings" "testing" - sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" - f "github.com/flyteorg/flytectl/pkg/filesystemutils" "github.com/docker/docker/api/types/container" @@ -106,7 +104,7 @@ func TestPullDockerImage(t *testing.T) { ctx := context.Background() // Verify the attributes mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) - err := PullDockerImage(ctx, mockDocker, "nginx:latest", sandboxConfig.ImagePullPolicyAlways, sandboxConfig.ImagePullOptions{}) + err := PullDockerImage(ctx, mockDocker, "nginx:latest", ImagePullPolicyAlways, ImagePullOptions{}) assert.Nil(t, err) }) @@ -116,7 +114,7 @@ func TestPullDockerImage(t *testing.T) { ctx := context.Background() // Verify the attributes mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, fmt.Errorf("error")) - err := PullDockerImage(ctx, mockDocker, "nginx:latest", sandboxConfig.ImagePullPolicyAlways, sandboxConfig.ImagePullOptions{}) + err := PullDockerImage(ctx, mockDocker, "nginx:latest", ImagePullPolicyAlways, ImagePullOptions{}) assert.NotNil(t, err) }) @@ -127,7 +125,7 @@ func TestPullDockerImage(t *testing.T) { // Verify the attributes mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) mockDocker.OnImageListMatch(ctx, types.ImageListOptions{}).Return([]types.ImageSummary{}, nil) - err := PullDockerImage(ctx, mockDocker, "nginx:latest", sandboxConfig.ImagePullPolicyIfNotPresent, sandboxConfig.ImagePullOptions{}) + err := PullDockerImage(ctx, mockDocker, "nginx:latest", ImagePullPolicyIfNotPresent, ImagePullOptions{}) assert.Nil(t, err) }) @@ -135,7 +133,7 @@ func TestPullDockerImage(t *testing.T) { setupSandbox() mockDocker := &mocks.Docker{} ctx := context.Background() - err := PullDockerImage(ctx, mockDocker, "nginx:latest", sandboxConfig.ImagePullPolicyNever, sandboxConfig.ImagePullOptions{}) + err := PullDockerImage(ctx, mockDocker, "nginx:latest", ImagePullPolicyNever, ImagePullOptions{}) assert.Nil(t, err) }) } diff --git a/flytectl/cmd/config/subcommand/sandbox/imagepullpolicy_enumer.go b/flytectl/pkg/docker/imagepullpolicy_enumer.go similarity index 99% rename from flytectl/cmd/config/subcommand/sandbox/imagepullpolicy_enumer.go rename to flytectl/pkg/docker/imagepullpolicy_enumer.go index 8416741819..a5f09b9ee8 100644 --- a/flytectl/cmd/config/subcommand/sandbox/imagepullpolicy_enumer.go +++ b/flytectl/pkg/docker/imagepullpolicy_enumer.go @@ -1,7 +1,7 @@ // Code generated by "enumer -type=ImagePullPolicy -trimprefix=ImagePullPolicy --json"; DO NOT EDIT. // -package sandbox +package docker import ( "encoding/json" diff --git a/flytectl/pkg/sandbox/start.go b/flytectl/pkg/sandbox/start.go new file mode 100644 index 0000000000..e934ab5372 --- /dev/null +++ b/flytectl/pkg/sandbox/start.go @@ -0,0 +1,308 @@ +package sandbox + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "path/filepath" + "time" + + "github.com/avast/retry-go" + "github.com/docker/docker/api/types/mount" + "github.com/docker/go-connections/nat" + "github.com/enescakir/emoji" + "github.com/flyteorg/flytectl/clierrors" + sandboxCmdConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" + "github.com/flyteorg/flytectl/pkg/configutil" + "github.com/flyteorg/flytectl/pkg/docker" + "github.com/flyteorg/flytectl/pkg/github" + "github.com/flyteorg/flytectl/pkg/k8s" + "github.com/flyteorg/flytectl/pkg/util" + "github.com/kataras/tablewriter" + corev1api "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + flyteNamespace = "flyte" + diskPressureTaint = "node.kubernetes.io/disk-pressure" + taintEffect = "NoSchedule" + sandboxContextName = "flyte-sandbox" + sandboxDockerContext = "default" + k8sEndpoint = "https://127.0.0.1:30086" + sandboxImageName = "cr.flyte.org/flyteorg/flyte-sandbox" + demoImageName = "cr.flyte.org/flyteorg/flyte-sandbox-lite" +) + +func isNodeTainted(ctx context.Context, client corev1.CoreV1Interface) (bool, error) { + nodes, err := client.Nodes().List(ctx, v1.ListOptions{}) + if err != nil { + return false, err + } + match := 0 + for _, node := range nodes.Items { + for _, c := range node.Spec.Taints { + if c.Key == diskPressureTaint && c.Effect == taintEffect { + match++ + } + } + } + if match > 0 { + return true, nil + } + return false, nil +} + +func isPodReady(v corev1api.Pod) bool { + if (v.Status.Phase == corev1api.PodRunning) || (v.Status.Phase == corev1api.PodSucceeded) { + return true + } + return false +} + +func getFlyteDeployment(ctx context.Context, client corev1.CoreV1Interface) (*corev1api.PodList, error) { + pods, err := client.Pods(flyteNamespace).List(ctx, v1.ListOptions{}) + if err != nil { + return nil, err + } + return pods, nil +} + +func WatchFlyteDeployment(ctx context.Context, appsClient corev1.CoreV1Interface) error { + var data = os.Stdout + table := tablewriter.NewWriter(data) + table.SetHeader([]string{"Service", "Status", "Namespace"}) + table.SetRowLine(true) + + for { + isTaint, err := isNodeTainted(ctx, appsClient) + if err != nil { + return err + } + if isTaint { + return fmt.Errorf("docker sandbox doesn't have sufficient memory available. Please run docker system prune -a --volumes") + } + + pods, err := getFlyteDeployment(ctx, appsClient) + if err != nil { + return err + } + table.ClearRows() + table.SetAutoWrapText(false) + table.SetAutoFormatHeaders(true) + + // Clear os.Stdout + _, _ = data.WriteString("\x1b[3;J\x1b[H\x1b[2J") + + var total, ready int + total = len(pods.Items) + ready = 0 + if total != 0 { + for _, v := range pods.Items { + if isPodReady(v) { + ready++ + } + if len(v.Status.Conditions) > 0 { + table.Append([]string{v.GetName(), string(v.Status.Phase), v.GetNamespace()}) + } + } + table.Render() + if total == ready { + break + } + } else { + table.Append([]string{"k8s: This might take a little bit", "Bootstrapping", ""}) + table.Render() + } + + time.Sleep(40 * time.Second) + } + + return nil +} + +func MountVolume(file, destination string) (*mount.Mount, error) { + if len(file) > 0 { + source, err := filepath.Abs(file) + if err != nil { + return nil, err + } + return &mount.Mount{ + Type: mount.TypeBind, + Source: source, + Target: destination, + }, nil + } + return nil, nil +} + +func UpdateLocalKubeContext(dockerCtx string, contextName string) error { + srcConfigAccess := &clientcmd.PathOptions{ + GlobalFile: docker.Kubeconfig, + LoadingRules: clientcmd.NewDefaultClientConfigLoadingRules(), + } + k8sCtxMgr := k8s.NewK8sContextManager() + return k8sCtxMgr.CopyContext(srcConfigAccess, dockerCtx, contextName) +} + +func startSandbox(ctx context.Context, cli docker.Docker, g github.GHRepoService, reader io.Reader, sandboxConfig *sandboxCmdConfig.Config, defaultImageName string, defaultImagePrefix string, exposedPorts map[nat.Port]struct{}, portBindings map[nat.Port][]nat.PortBinding, consolePort int) (*bufio.Scanner, error) { + fmt.Printf("%v Bootstrapping a brand new flyte cluster... %v %v\n", emoji.FactoryWorker, emoji.Hammer, emoji.Wrench) + + if err := docker.RemoveSandbox(ctx, cli, reader); err != nil { + if err.Error() != clierrors.ErrSandboxExists { + return nil, err + } + fmt.Printf("Existing details of your sandbox") + util.PrintSandboxMessage(consolePort) + return nil, nil + } + + if err := util.SetupFlyteDir(); err != nil { + return nil, err + } + + templateValues := configutil.ConfigTemplateSpec{ + Host: "localhost:30081", + Insecure: true, + } + if err := configutil.SetupConfig(configutil.FlytectlConfig, configutil.GetTemplate(), templateValues); err != nil { + return nil, err + } + + volumes := docker.Volumes + if vol, err := MountVolume(sandboxConfig.Source, docker.Source); err != nil { + return nil, err + } else if vol != nil { + volumes = append(volumes, *vol) + } + sandboxImage := sandboxConfig.Image + if len(sandboxImage) == 0 { + image, version, err := github.GetFullyQualifiedImageName(defaultImagePrefix, sandboxConfig.Version, defaultImageName, sandboxConfig.Prerelease, g) + if err != nil { + return nil, err + } + sandboxImage = image + fmt.Printf("%s Fully Qualified image\n", image) + fmt.Printf("%v Running Flyte %s release\n", emoji.Whale, version) + } + fmt.Printf("%v pulling docker image for release %s\n", emoji.Whale, sandboxImage) + if err := docker.PullDockerImage(ctx, cli, sandboxImage, sandboxConfig.ImagePullPolicy, sandboxConfig.ImagePullOptions); err != nil { + return nil, err + } + + fmt.Printf("%v booting Flyte-sandbox container\n", emoji.FactoryWorker) + ID, err := docker.StartContainer(ctx, cli, volumes, exposedPorts, portBindings, docker.FlyteSandboxClusterName, + sandboxImage, sandboxConfig.Env) + + if err != nil { + fmt.Printf("%v Something went wrong: Failed to start Sandbox container %v, Please check your docker client and try again. \n", emoji.GrimacingFace, emoji.Whale) + return nil, err + } + + logReader, err := docker.ReadLogs(ctx, cli, ID) + if err != nil { + return nil, err + } + + return logReader, nil +} + +func primeFlytekitPod(ctx context.Context, podService corev1.PodInterface) { + _, err := podService.Create(ctx, &corev1api.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: "py39-cacher", + }, + Spec: corev1api.PodSpec{ + RestartPolicy: corev1api.RestartPolicyNever, + Containers: []corev1api.Container{ + { + + Name: "flytekit", + Image: "ghcr.io/flyteorg/flytekit:py3.9-latest", + Command: []string{"echo"}, + Args: []string{"Flyte"}, + }, + }, + }, + }, v1.CreateOptions{}) + if err != nil { + fmt.Printf("Failed to create primer pod - %s", err) + } +} + +func StartCluster(ctx context.Context, args []string, sandboxConfig *sandboxCmdConfig.Config, primePod bool, defaultImageName string, defaultImagePrefix string, exposedPorts map[nat.Port]struct{}, portBindings map[nat.Port][]nat.PortBinding, consolePort int) error { + cli, err := docker.GetDockerClient() + if err != nil { + return err + } + + ghRepo := github.GetGHRepoService() + + reader, err := startSandbox(ctx, cli, ghRepo, os.Stdin, sandboxConfig, defaultImageName, defaultImagePrefix, exposedPorts, portBindings, consolePort) + if err != nil { + return err + } + if reader != nil { + docker.WaitForSandbox(reader, docker.SuccessMessage) + } + + if reader != nil { + var k8sClient k8s.K8s + err = retry.Do( + func() error { + k8sClient, err = k8s.GetK8sClient(docker.Kubeconfig, k8sEndpoint) + return err + }, + retry.Attempts(10), + ) + if err != nil { + return err + } + if err = UpdateLocalKubeContext(sandboxDockerContext, sandboxContextName); err != nil { + return err + } + + if err := WatchFlyteDeployment(ctx, k8sClient.CoreV1()); err != nil { + return err + } + if primePod { + primeFlytekitPod(ctx, k8sClient.CoreV1().Pods("default")) + } + + } + return nil +} + +func StartDemoCluster(ctx context.Context, args []string, sandboxConfig *sandboxCmdConfig.Config) error { + primePod := true + sandboxImagePrefix := "sha" + exposedPorts, portBindings, err := docker.GetDemoPorts() + if err != nil { + return err + } + err = StartCluster(ctx, args, sandboxConfig, primePod, demoImageName, sandboxImagePrefix, exposedPorts, portBindings, util.DemoConsolePort) + if err != nil { + return err + } + util.PrintDemoMessage(util.DemoConsolePort) + return nil +} + +func StartSandboxCluster(ctx context.Context, args []string, sandboxConfig *sandboxCmdConfig.Config) error { + primePod := false + demoImagePrefix := "dind" + exposedPorts, portBindings, err := docker.GetSandboxPorts() + if err != nil { + return err + } + err = StartCluster(ctx, args, sandboxConfig, primePod, sandboxImageName, demoImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + if err != nil { + return err + } + util.PrintSandboxMessage(util.SandBoxConsolePort) + return nil +} diff --git a/flytectl/pkg/sandbox/start_test.go b/flytectl/pkg/sandbox/start_test.go new file mode 100644 index 0000000000..f4d46b2321 --- /dev/null +++ b/flytectl/pkg/sandbox/start_test.go @@ -0,0 +1,438 @@ +package sandbox + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "strings" + "testing" + + "github.com/flyteorg/flytectl/pkg/docker" + "github.com/flyteorg/flytectl/pkg/docker/mocks" + f "github.com/flyteorg/flytectl/pkg/filesystemutils" + ghutil "github.com/flyteorg/flytectl/pkg/github" + ghMocks "github.com/flyteorg/flytectl/pkg/github/mocks" + "github.com/flyteorg/flytectl/pkg/k8s" + k8sMocks "github.com/flyteorg/flytectl/pkg/k8s/mocks" + "github.com/flyteorg/flytectl/pkg/util" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + sandboxCmdConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" + "github.com/google/go-github/v42/github" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testclient "k8s.io/client-go/kubernetes/fake" +) + +var content = ` +apiVersion: v1 +clusters: +- cluster: + server: https://localhost:8080 + extensions: + - name: client.authentication.k8s.io/exec + extension: + audience: foo + other: bar + name: default +contexts: +- context: + cluster: default + user: default + namespace: bar + name: default +current-context: default +kind: Config +users: +- name: default + user: + exec: + apiVersion: client.authentication.k8s.io/v1alpha1 + args: + - arg-1 + - arg-2 + command: foo-command + provideClusterInfo: true +` + +var fakeNode = &corev1.Node{ + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{}, + }, +} + +var fakePod = corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{}, + }, +} + +var ( + githubMock *ghMocks.GHRepoService + ctx context.Context + mockDocker *mocks.Docker +) + +func sandboxSetup() { + ctx = context.Background() + mockDocker = &mocks.Docker{} + errCh := make(chan error) + sandboxCmdConfig.DefaultConfig.Version = "v0.19.1" + bodyStatus := make(chan container.ContainerWaitOKBody) + githubMock = &ghMocks.GHRepoService{} + sandboxCmdConfig.DefaultConfig.Image = "dummyimage" + mockDocker.OnContainerCreateMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) +} + +func TestStartFunc(t *testing.T) { + defaultImagePrefix := "dind" + exposedPorts, portBindings, _ := docker.GetSandboxPorts() + config := sandboxCmdConfig.DefaultConfig + config.Image = "dummyimage" + config.ImagePullOptions = docker.ImagePullOptions{ + RegistryAuth: "", + Platform: "", + } + assert.Nil(t, util.SetupFlyteDir()) + assert.Nil(t, os.MkdirAll(f.FilePathJoin(f.UserHomeDir(), ".flyte", "k3s"), os.ModePerm)) + assert.Nil(t, ioutil.WriteFile(docker.Kubeconfig, []byte(content), os.ModePerm)) + + fakePod.SetName("flyte") + + t.Run("Successfully run demo cluster", func(t *testing.T) { + sandboxSetup() + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + + _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + assert.Nil(t, err) + }) + t.Run("Successfully exit when demo cluster exist", func(t *testing.T) { + sandboxSetup() + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{ + { + ID: docker.FlyteSandboxClusterName, + Names: []string{ + docker.FlyteSandboxClusterName, + }, + }, + }, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + reader, err := startSandbox(ctx, mockDocker, githubMock, strings.NewReader("n"), config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + assert.Nil(t, err) + assert.Nil(t, reader) + }) + t.Run("Successfully run demo cluster with source code", func(t *testing.T) { + sandboxCmdConfig.DefaultConfig.Source = f.UserHomeDir() + sandboxCmdConfig.DefaultConfig.Version = "" + sandboxSetup() + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + assert.Nil(t, err) + }) + t.Run("Successfully run demo cluster with abs path of source code", func(t *testing.T) { + sandboxCmdConfig.DefaultConfig.Source = "../" + sandboxCmdConfig.DefaultConfig.Version = "" + sandboxSetup() + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + assert.Nil(t, err) + }) + t.Run("Successfully run demo cluster with specific version", func(t *testing.T) { + sandboxSetup() + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + sandboxCmdConfig.DefaultConfig.Image = "" + tag := "v0.15.0" + githubMock.OnGetReleaseByTagMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&github.RepositoryRelease{ + TagName: &tag, + }, nil, nil) + + githubMock.OnGetCommitSHA1Match(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("dummySha", nil, nil) + _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + assert.Nil(t, err) + }) + t.Run("Failed run demo cluster with wrong version", func(t *testing.T) { + sandboxSetup() + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + sandboxCmdConfig.DefaultConfig.Image = "" + githubMock.OnGetReleaseByTagMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("non-existent-tag")) + _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + assert.NotNil(t, err) + assert.Equal(t, "non-existent-tag", err.Error()) + }) + t.Run("Error in pulling image", func(t *testing.T) { + sandboxSetup() + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, fmt.Errorf("failed to pull")) + sandboxCmdConfig.DefaultConfig.Image = "" + tag := "v0.15.0" + githubMock.OnGetReleaseByTagMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&github.RepositoryRelease{ + TagName: &tag, + }, nil, nil) + + githubMock.OnGetCommitSHA1Match(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("dummySha", nil, nil) + _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + assert.NotNil(t, err) + assert.Equal(t, "failed to pull", err.Error()) + }) + t.Run("Error in removing existing cluster", func(t *testing.T) { + sandboxSetup() + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{ + { + ID: docker.FlyteSandboxClusterName, + Names: []string{ + docker.FlyteSandboxClusterName, + }, + }, + }, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerRemove(ctx, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(fmt.Errorf("failed to remove container")) + _, err := startSandbox(ctx, mockDocker, githubMock, strings.NewReader("y"), sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + assert.NotNil(t, err) + assert.Equal(t, "failed to remove container", err.Error()) + }) + t.Run("Error in start container", func(t *testing.T) { + sandboxSetup() + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(fmt.Errorf("failed to run container")) + _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + assert.NotNil(t, err) + assert.Equal(t, "failed to run container", err.Error()) + }) + t.Run("Error in reading logs", func(t *testing.T) { + sandboxSetup() + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, fmt.Errorf("failed to get container logs")) + _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + assert.NotNil(t, err) + assert.Equal(t, "failed to get container logs", err.Error()) + }) + t.Run("Error in list container", func(t *testing.T) { + sandboxSetup() + mockDocker.OnContainerListMatch(mock.Anything, mock.Anything).Return([]types.Container{}, fmt.Errorf("failed to list containers")) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + assert.NotNil(t, err) + assert.Equal(t, "failed to list containers", err.Error()) + }) + t.Run("Successfully run demo cluster command", func(t *testing.T) { + // mockOutStream := new(io.Writer) + //cmdCtx := cmdCore.NewCommandContext(admin.InitializeMockClientset(), *mockOutStream) + client := testclient.NewSimpleClientset() + k8s.Client = client + _, err := client.CoreV1().Pods("flyte").Create(ctx, &fakePod, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + fakeNode.SetName("master") + _, err = client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + sandboxSetup() + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(mock.Anything, mock.Anything, mock.Anything).Return(os.Stdin, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + + stringReader := strings.NewReader(docker.SuccessMessage) + reader := ioutil.NopCloser(stringReader) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(reader, nil) + mockK8sContextMgr := &k8sMocks.ContextOps{} + docker.Client = mockDocker + sandboxCmdConfig.DefaultConfig.Source = "" + sandboxCmdConfig.DefaultConfig.Version = "" + k8s.ContextMgr = mockK8sContextMgr + ghutil.Client = githubMock + mockK8sContextMgr.OnCopyContextMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) + err = StartSandboxCluster(context.Background(), []string{}, config) + assert.Nil(t, err) + err = StartDemoCluster(context.Background(), []string{}, config) + assert.Nil(t, err) + }) + t.Run("Error in running demo cluster command", func(t *testing.T) { + //mockOutStream := new(io.Writer) + //cmdCtx := cmdCore.NewCommandContext(admin.InitializeMockClientset(), *mockOutStream) + sandboxSetup() + docker.Client = mockDocker + mockDocker.OnContainerListMatch(mock.Anything, mock.Anything).Return([]types.Container{}, fmt.Errorf("failed to list containers")) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + err := StartSandboxCluster(context.Background(), []string{}, config) + assert.NotNil(t, err) + err = StartDemoCluster(context.Background(), []string{}, config) + assert.NotNil(t, err) + }) +} + +func TestMonitorFlyteDeployment(t *testing.T) { + t.Run("Monitor k8s deployment fail because of storage", func(t *testing.T) { + ctx := context.Background() + client := testclient.NewSimpleClientset() + k8s.Client = client + fakePod.SetName("flyte") + fakePod.SetName("flyte") + + _, err := client.CoreV1().Pods("flyte").Create(ctx, &fakePod, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + fakeNode.SetName("master") + fakeNode.Spec.Taints = append(fakeNode.Spec.Taints, corev1.Taint{ + Effect: "NoSchedule", + Key: "node.kubernetes.io/disk-pressure", + }) + _, err = client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + + err = WatchFlyteDeployment(ctx, client.CoreV1()) + assert.NotNil(t, err) + + }) + + t.Run("Monitor k8s deployment success", func(t *testing.T) { + ctx := context.Background() + client := testclient.NewSimpleClientset() + k8s.Client = client + fakePod.SetName("flyte") + fakePod.SetName("flyte") + + _, err := client.CoreV1().Pods("flyte").Create(ctx, &fakePod, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + fakeNode.SetName("master") + fakeNode.Spec.Taints = []corev1.Taint{} + _, err = client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + + err = WatchFlyteDeployment(ctx, client.CoreV1()) + assert.Nil(t, err) + + }) + +} + +func TestGetFlyteDeploymentCount(t *testing.T) { + + ctx := context.Background() + client := testclient.NewSimpleClientset() + c, err := getFlyteDeployment(ctx, client.CoreV1()) + assert.Nil(t, err) + assert.Equal(t, 0, len(c.Items)) +} + +func TestGetNodeTaintStatus(t *testing.T) { + t.Run("Check node taint with success", func(t *testing.T) { + ctx := context.Background() + client := testclient.NewSimpleClientset() + fakeNode.SetName("master") + _, err := client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + c, err := isNodeTainted(ctx, client.CoreV1()) + assert.Nil(t, err) + assert.Equal(t, false, c) + }) + t.Run("Check node taint with fail", func(t *testing.T) { + ctx := context.Background() + client := testclient.NewSimpleClientset() + fakeNode.SetName("master") + _, err := client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + node, err := client.CoreV1().Nodes().Get(ctx, "master", v1.GetOptions{}) + if err != nil { + t.Error(err) + } + node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{ + Effect: taintEffect, + Key: diskPressureTaint, + }) + _, err = client.CoreV1().Nodes().Update(ctx, node, v1.UpdateOptions{}) + if err != nil { + t.Error(err) + } + c, err := isNodeTainted(ctx, client.CoreV1()) + assert.Nil(t, err) + assert.Equal(t, true, c) + }) +}