diff --git a/flytectl/.github/workflows/generate-docs.yaml b/flytectl/.github/workflows/generate-docs.yaml index 21c0aad628..1c442ea4f8 100644 --- a/flytectl/.github/workflows/generate-docs.yaml +++ b/flytectl/.github/workflows/generate-docs.yaml @@ -6,7 +6,7 @@ on: - master jobs: - build: + generate-docs: name: Generate documentation runs-on: ubuntu-latest steps: diff --git a/flytectl/.github/workflows/sandbox.yaml b/flytectl/.github/workflows/sandbox.yaml new file mode 100644 index 0000000000..35c10cee19 --- /dev/null +++ b/flytectl/.github/workflows/sandbox.yaml @@ -0,0 +1,28 @@ +name: Test Getting started + +on: + pull_request: + branches: + - master + +jobs: + sandbox: + name: Test Getting started + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v2 + - uses: actions/cache@v2 + with: + path: | + ~/.cache/go-build + ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('go.sum') }} + - name: Build Flytectl binary + run: make compile + - name: Create a sandbox cluster + run: bin/flytectl sandbox start + - name: Register cookbook + run: bin/flytectl register examples -d development -p flytesnacks || true + - name: Teardown Sandbox cluster + run: bin/flytectl sandbox teardown diff --git a/flytectl/cmd/sandbox/sandbox_util.go b/flytectl/cmd/sandbox/sandbox_util.go deleted file mode 100644 index e6c0d43944..0000000000 --- a/flytectl/cmd/sandbox/sandbox_util.go +++ /dev/null @@ -1,159 +0,0 @@ -package sandbox - -import ( - "bufio" - "context" - "fmt" - "io" - "io/ioutil" - "net/http" - "os" - "strings" - - cmdUtil "github.com/flyteorg/flytectl/pkg/commandutils" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/mount" - "github.com/docker/docker/client" - "github.com/docker/go-connections/nat" - "github.com/enescakir/emoji" - f "github.com/flyteorg/flytectl/pkg/filesystemutils" -) - -var ( - Kubeconfig = f.FilePathJoin(f.UserHomeDir(), ".flyte", "k3s", "k3s.yaml") - FlytectlConfig = f.FilePathJoin(f.UserHomeDir(), ".flyte", "config-sandbox.yaml") - SuccessMessage = "Flyte is ready! Flyte UI is available at http://localhost:30081/console" - ImageName = "ghcr.io/flyteorg/flyte-sandbox:dind" - flyteSandboxClusterName = "flyte-sandbox" - Environment = []string{"SANDBOX=1", "KUBERNETES_API_PORT=30086", "FLYTE_HOST=localhost:30081", "FLYTE_AWS_ENDPOINT=http://localhost:30084"} - flyteSnackDir = "/usr/src" - K3sDir = "/etc/rancher/" -) - -func setupFlytectlConfig() error { - - _ = os.MkdirAll(f.FilePathJoin(f.UserHomeDir(), ".flyte"), 0755) - - response, err := http.Get("https://raw.githubusercontent.com/flyteorg/flytectl/master/config.yaml") - if err != nil { - return err - } - defer response.Body.Close() - - data, err := ioutil.ReadAll(response.Body) - if err != nil { - return err - } - - _ = ioutil.WriteFile(FlytectlConfig, data, 0600) - return nil -} - -func configCleanup() error { - err := os.Remove(FlytectlConfig) - if err != nil { - return err - } - err = os.RemoveAll(f.FilePathJoin(f.UserHomeDir(), ".flyte", "k3s")) - if err != nil { - return err - } - return nil -} - -func getSandbox(cli *client.Client) *types.Container { - containers, _ := cli.ContainerList(context.Background(), types.ContainerListOptions{ - All: true, - }) - for _, v := range containers { - if strings.Contains(v.Names[0], flyteSandboxClusterName) { - return &v - } - } - return nil -} - -func removeSandboxIfExist(cli *client.Client, reader io.Reader) error { - if c := getSandbox(cli); c != nil { - if cmdUtil.AskForConfirmation("delete existing sandbox cluster", reader) { - err := cli.ContainerRemove(context.Background(), c.ID, types.ContainerRemoveOptions{ - Force: true, - }) - return err - } - os.Exit(0) - } - return nil -} - -func startContainer(cli *client.Client, volumes []mount.Mount) (string, error) { - ExposedPorts, PortBindings, _ := nat.ParsePortSpecs([]string{ - "127.0.0.1:30086:30086", - "127.0.0.1:30081:30081", - "127.0.0.1:30082:30082", - "127.0.0.1:30084:30084", - }) - r, err := cli.ImagePull(context.Background(), ImageName, types.ImagePullOptions{}) - if err != nil { - return "", err - } - _, _ = io.Copy(os.Stdout, r) - resp, err := cli.ContainerCreate(context.Background(), &container.Config{ - Env: Environment, - Image: ImageName, - Tty: false, - ExposedPorts: ExposedPorts, - }, &container.HostConfig{ - Mounts: volumes, - PortBindings: PortBindings, - Privileged: true, - }, nil, - nil, flyteSandboxClusterName) - - if err != nil { - return "", err - } - go watchError(cli, resp.ID) - if err := cli.ContainerStart(context.Background(), resp.ID, types.ContainerStartOptions{}); err != nil { - return "", err - } - return resp.ID, nil -} - -func watchError(cli *client.Client, id string) { - statusCh, errCh := cli.ContainerWait(context.Background(), id, container.WaitConditionNotRunning) - - select { - case err := <-errCh: - if err != nil { - panic(err) - } - case <-statusCh: - } -} - -func readLogs(cli *client.Client, id, message string) error { - reader, err := cli.ContainerLogs(context.Background(), id, types.ContainerLogsOptions{ - ShowStderr: true, - ShowStdout: true, - Timestamps: true, - Follow: true, - }) - if err != nil { - return err - } - scanner := bufio.NewScanner(reader) - - for scanner.Scan() { - if strings.Contains(scanner.Text(), message) { - fmt.Printf("%v %v %v %v %v \n", emoji.ManTechnologist, message, emoji.Rocket, emoji.Rocket, emoji.PartyPopper) - fmt.Printf("Please visit https://github.com/flyteorg/flytesnacks for more example %v \n", emoji.Rocket) - fmt.Printf("Register all flytesnacks example by running 'flytectl register examples -d development -p flytesnacks' \n") - break - } - fmt.Println(scanner.Text()) - } - return nil -} diff --git a/flytectl/cmd/sandbox/sandbox_util_test.go b/flytectl/cmd/sandbox/sandbox_util_test.go deleted file mode 100644 index ab7e981b98..0000000000 --- a/flytectl/cmd/sandbox/sandbox_util_test.go +++ /dev/null @@ -1,136 +0,0 @@ -package sandbox - -import ( - "context" - "io/ioutil" - "os" - "strings" - "testing" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/mount" - "github.com/docker/docker/client" - sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" - cmdCore "github.com/flyteorg/flytectl/cmd/core" - u "github.com/flyteorg/flytectl/cmd/testutils" - - f "github.com/flyteorg/flytectl/pkg/filesystemutils" - - "github.com/stretchr/testify/assert" -) - -var ( - cmdCtx cmdCore.CommandContext -) - -func cleanup(client *client.Client) error { - containers, err := client.ContainerList(context.Background(), types.ContainerListOptions{ - All: true, - }) - if err != nil { - return err - } - for _, v := range containers { - if strings.Contains(v.Names[0], flyteSandboxClusterName) { - if err := client.ContainerRemove(context.Background(), v.ID, types.ContainerRemoveOptions{ - Force: true, - }); err != nil { - return err - } - } - } - return nil -} - -func setupSandbox() { - mockAdminClient := u.MockClient - cmdCtx = cmdCore.NewCommandContext(mockAdminClient, u.MockOutStream) - _ = setupFlytectlConfig() -} - -func TestConfigCleanup(t *testing.T) { - _, err := os.Stat(f.FilePathJoin(f.UserHomeDir(), ".flyte")) - if os.IsNotExist(err) { - _ = os.MkdirAll(f.FilePathJoin(f.UserHomeDir(), ".flyte"), 0755) - } - _ = ioutil.WriteFile(FlytectlConfig, []byte("string"), 0600) - _ = ioutil.WriteFile(Kubeconfig, []byte("string"), 0600) - - err = configCleanup() - assert.Nil(t, err) - - _, err = os.Stat(FlytectlConfig) - check := os.IsNotExist(err) - assert.Equal(t, check, true) - - _, err = os.Stat(Kubeconfig) - check = os.IsNotExist(err) - assert.Equal(t, check, true) - _ = configCleanup() -} - -func TestSetupFlytectlConfig(t *testing.T) { - _, err := os.Stat(f.FilePathJoin(f.UserHomeDir(), ".flyte")) - if os.IsNotExist(err) { - _ = os.MkdirAll(f.FilePathJoin(f.UserHomeDir(), ".flyte"), 0755) - } - err = setupFlytectlConfig() - assert.Nil(t, err) - _, err = os.Stat(FlytectlConfig) - assert.Nil(t, err) - check := os.IsNotExist(err) - assert.Equal(t, check, false) - _ = configCleanup() - -} - -func TestTearDownSandbox(t *testing.T) { - setupSandbox() - cli, _ := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) - err := teardownSandboxCluster(context.Background(), []string{}, cmdCtx) - assert.Nil(t, err) - assert.Nil(t, cleanup(cli)) - - volumes = []mount.Mount{} - _ = startSandboxCluster(context.Background(), []string{}, cmdCtx) - err = teardownSandboxCluster(context.Background(), []string{}, cmdCtx) - assert.Nil(t, err) - -} - -func TestStartSandbox(t *testing.T) { - cli, _ := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) - - assert.Nil(t, cleanup(cli)) - setupSandbox() - volumes = []mount.Mount{} - sandboxConfig.DefaultConfig.SnacksRepo = "/tmp" - err := startSandboxCluster(context.Background(), []string{}, cmdCtx) - assert.Nil(t, err) - - assert.Nil(t, cleanup(cli)) - setupSandbox() - sandboxConfig.DefaultConfig.SnacksRepo = f.UserHomeDir() - err = startSandboxCluster(context.Background(), []string{}, cmdCtx) - assert.NotNil(t, err) - - assert.Nil(t, cleanup(cli)) - _, err = startContainer(cli, []mount.Mount{}) - assert.Nil(t, err) - - assert.Nil(t, cleanup(cli)) - ImageName = "" - _, err = startContainer(cli, []mount.Mount{}) - assert.NotNil(t, err) -} - -func TestGetSandbox(t *testing.T) { - cli, _ := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) - assert.Nil(t, cleanup(cli)) - setupSandbox() - sandboxConfig.DefaultConfig.SnacksRepo = f.UserHomeDir() - _ = startSandboxCluster(context.Background(), []string{}, cmdCtx) - - container := removeSandboxIfExist(cli, strings.NewReader("y")) - assert.Nil(t, container) -} diff --git a/flytectl/cmd/sandbox/start.go b/flytectl/cmd/sandbox/start.go index 913e752d62..33c91ea354 100644 --- a/flytectl/cmd/sandbox/start.go +++ b/flytectl/cmd/sandbox/start.go @@ -1,16 +1,18 @@ package sandbox import ( + "bufio" "context" "fmt" + "io" "os" + "github.com/flyteorg/flytectl/pkg/docker" + "github.com/docker/docker/api/types/mount" - "github.com/docker/docker/client" "github.com/enescakir/emoji" sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" cmdCore "github.com/flyteorg/flytectl/cmd/core" - f "github.com/flyteorg/flytectl/pkg/filesystemutils" ) const ( @@ -29,14 +31,6 @@ Usage ` ) -var volumes = []mount.Mount{ - { - Type: mount.TypeBind, - Source: f.FilePathJoin(f.UserHomeDir(), ".flyte"), - Target: K3sDir, - }, -} - type ExecResult struct { StdOut string StdErr string @@ -44,47 +38,66 @@ type ExecResult struct { } func startSandboxCluster(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error { - fmt.Printf("%v Bootstrapping a brand new flyte cluster... %v %v\n", emoji.FactoryWorker, emoji.Hammer, emoji.Wrench) - cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + cli, err := docker.GetDockerClient() if err != nil { - fmt.Printf("%v Please Check your docker client %v \n", emoji.GrimacingFace, emoji.Whale) return err } - if err := setupFlytectlConfig(); err != nil { + reader, err := startSandbox(ctx, cli, os.Stdin) + if err != nil { return err } + docker.WaitForSandbox(reader, docker.SuccessMessage) + return nil +} - if err := removeSandboxIfExist(cli, os.Stdin); err != nil { - return err +func startSandbox(ctx context.Context, cli docker.Docker, reader io.Reader) (*bufio.Scanner, error) { + fmt.Printf("%v Bootstrapping a brand new flyte cluster... %v %v\n", emoji.FactoryWorker, emoji.Hammer, emoji.Wrench) + if err := docker.SetupFlyteDir(); err != nil { + return nil, err + } + + if err := docker.GetFlyteSandboxConfig(); err != nil { + return nil, err + } + + if err := docker.RemoveSandbox(ctx, cli, reader); err != nil { + return nil, err } if len(sandboxConfig.DefaultConfig.SnacksRepo) > 0 { - volumes = append(volumes, mount.Mount{ + docker.Volumes = append(docker.Volumes, mount.Mount{ Type: mount.TypeBind, Source: sandboxConfig.DefaultConfig.SnacksRepo, - Target: flyteSnackDir, + Target: docker.FlyteSnackDir, }) } - os.Setenv("KUBECONFIG", Kubeconfig) - os.Setenv("FLYTECTL_CONFIG", FlytectlConfig) + os.Setenv("KUBECONFIG", docker.Kubeconfig) + os.Setenv("FLYTECTL_CONFIG", docker.FlytectlConfig) + if err := docker.PullDockerImage(ctx, cli, docker.ImageName); err != nil { + return nil, err + } - defer func() { - if r := recover(); r != nil { - fmt.Printf("%v Something went horribly wrong! %s\n", emoji.GrimacingFace, r) - } - }() + exposedPorts, portBindings, _ := docker.GetSandboxPorts() + ID, err := docker.StartContainer(ctx, cli, docker.Volumes, exposedPorts, portBindings, docker.FlyteSandboxClusterName, docker.ImageName) + if err != nil { + fmt.Printf("%v Something went wrong: Failed to start Sandbox container %v, Please check your docker client and try again. \n", emoji.GrimacingFace, emoji.Whale) + return nil, err + } - ID, err := startContainer(cli, volumes) + _, errCh := docker.WatchError(ctx, cli, ID) + logReader, err := docker.ReadLogs(ctx, cli, ID) if err != nil { - fmt.Printf("%v Something went horribly wrong: Failed to start Sandbox container %v, Please check your docker client and try again. \n", emoji.GrimacingFace, emoji.Whale) - return fmt.Errorf("error: %v", err) + return nil, err } + go func() { + err := <-errCh + if err != nil { + fmt.Printf("err: %v", err) + os.Exit(1) + } + }() - _ = readLogs(cli, ID, SuccessMessage) - fmt.Printf("Add KUBECONFIG and FLYTECTL_CONFIG to your environment variable \n") - fmt.Printf("export KUBECONFIG=%v \n", Kubeconfig) - fmt.Printf("export FLYTECTL_CONFIG=%v \n", FlytectlConfig) - return nil + return logReader, nil } diff --git a/flytectl/cmd/sandbox/start_test.go b/flytectl/cmd/sandbox/start_test.go new file mode 100644 index 0000000000..1f29fe5464 --- /dev/null +++ b/flytectl/cmd/sandbox/start_test.go @@ -0,0 +1,355 @@ +package sandbox + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "strings" + "testing" + + cmdCore "github.com/flyteorg/flytectl/cmd/core" + + sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/mount" + "github.com/flyteorg/flytectl/pkg/docker" + "github.com/flyteorg/flytectl/pkg/docker/mocks" + f "github.com/flyteorg/flytectl/pkg/filesystemutils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestStartSandboxFunc(t *testing.T) { + p1, p2, _ := docker.GetSandboxPorts() + + t.Run("Successfully run sandbox cluster", func(t *testing.T) { + ctx := context.Background() + mockDocker := &mocks.Docker{} + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: docker.ImageName, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: docker.Volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err := startSandbox(ctx, mockDocker, os.Stdin) + assert.Nil(t, err) + }) + t.Run("Successfully run sandbox cluster with flytesnacks", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.SnacksRepo = f.UserHomeDir() + volumes := append(docker.Volumes, mount.Mount{ + Type: mount.TypeBind, + Source: sandboxConfig.DefaultConfig.SnacksRepo, + Target: docker.FlyteSnackDir, + }) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: docker.ImageName, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err := startSandbox(ctx, mockDocker, os.Stdin) + assert.Nil(t, err) + }) + t.Run("Error in pulling image", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.SnacksRepo = f.UserHomeDir() + volumes := append(docker.Volumes, mount.Mount{ + Type: mount.TypeBind, + Source: sandboxConfig.DefaultConfig.SnacksRepo, + Target: docker.FlyteSnackDir, + }) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: docker.ImageName, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, fmt.Errorf("error")) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err := startSandbox(ctx, mockDocker, os.Stdin) + assert.NotNil(t, err) + }) + t.Run("Error in removing existing cluster", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.SnacksRepo = f.UserHomeDir() + volumes := append(docker.Volumes, mount.Mount{ + Type: mount.TypeBind, + Source: sandboxConfig.DefaultConfig.SnacksRepo, + Target: docker.FlyteSnackDir, + }) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: docker.ImageName, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{ + { + ID: "FlyteSandboxClusterName", + Names: []string{ + docker.FlyteSandboxClusterName, + }, + }, + }, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerRemove(ctx, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(fmt.Errorf("error")) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err := startSandbox(ctx, mockDocker, strings.NewReader("y")) + assert.NotNil(t, err) + }) + t.Run("Error in start container", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.SnacksRepo = f.UserHomeDir() + volumes := append(docker.Volumes, mount.Mount{ + Type: mount.TypeBind, + Source: sandboxConfig.DefaultConfig.SnacksRepo, + Target: docker.FlyteSnackDir, + }) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: docker.ImageName, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, fmt.Errorf("error")) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(fmt.Errorf("error")) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err := startSandbox(ctx, mockDocker, os.Stdin) + assert.NotNil(t, err) + }) + t.Run("Error in reading logs", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.SnacksRepo = f.UserHomeDir() + volumes := append(docker.Volumes, mount.Mount{ + Type: mount.TypeBind, + Source: sandboxConfig.DefaultConfig.SnacksRepo, + Target: docker.FlyteSnackDir, + }) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: docker.ImageName, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, fmt.Errorf("error")) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err := startSandbox(ctx, mockDocker, os.Stdin) + assert.NotNil(t, err) + }) + t.Run("Error in list container", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.SnacksRepo = f.UserHomeDir() + volumes := append(docker.Volumes, mount.Mount{ + Type: mount.TypeBind, + Source: sandboxConfig.DefaultConfig.SnacksRepo, + Target: docker.FlyteSnackDir, + }) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: docker.ImageName, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, fmt.Errorf("error")) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err := startSandbox(ctx, mockDocker, os.Stdin) + assert.Nil(t, err) + }) + t.Run("Successfully run sandbox cluster command", func(t *testing.T) { + mockOutStream := new(io.Writer) + ctx := context.Background() + cmdCtx := cmdCore.NewCommandContext(nil, *mockOutStream) + mockDocker := &mocks.Docker{} + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: docker.ImageName, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: docker.Volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + stringReader := strings.NewReader(docker.SuccessMessage) + reader := ioutil.NopCloser(stringReader) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(reader, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + docker.Client = mockDocker + sandboxConfig.DefaultConfig.SnacksRepo = "" + err := startSandboxCluster(ctx, []string{}, cmdCtx) + assert.Nil(t, err) + }) + t.Run("Error in running sandbox cluster command", func(t *testing.T) { + mockOutStream := new(io.Writer) + ctx := context.Background() + cmdCtx := cmdCore.NewCommandContext(nil, *mockOutStream) + mockDocker := &mocks.Docker{} + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: docker.ImageName, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: docker.Volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(fmt.Errorf("error")) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, fmt.Errorf("error")) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + stringReader := strings.NewReader(docker.SuccessMessage) + reader := ioutil.NopCloser(stringReader) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(reader, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + docker.Client = mockDocker + sandboxConfig.DefaultConfig.SnacksRepo = "" + err := startSandboxCluster(ctx, []string{}, cmdCtx) + assert.NotNil(t, err) + }) +} diff --git a/flytectl/cmd/sandbox/teardown.go b/flytectl/cmd/sandbox/teardown.go index 9f2a217f25..9d05a2581b 100644 --- a/flytectl/cmd/sandbox/teardown.go +++ b/flytectl/cmd/sandbox/teardown.go @@ -4,10 +4,11 @@ import ( "context" "fmt" + "github.com/flyteorg/flytectl/pkg/docker" + "github.com/docker/docker/api/types" "github.com/enescakir/emoji" - "github.com/docker/docker/client" cmdCore "github.com/flyteorg/flytectl/cmd/core" ) @@ -25,19 +26,24 @@ Usage ) func teardownSandboxCluster(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error { - - cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + cli, err := docker.GetDockerClient() if err != nil { return err } - c := getSandbox(cli) + return tearDownSandbox(ctx, cli) +} + +func tearDownSandbox(ctx context.Context, cli docker.Docker) error { + c := docker.GetSandbox(ctx, cli) if c != nil { - _ = cli.ContainerRemove(context.Background(), c.ID, types.ContainerRemoveOptions{ + if err := cli.ContainerRemove(context.Background(), c.ID, types.ContainerRemoveOptions{ Force: true, - }) + }); err != nil { + return err + } } - if err := configCleanup(); err != nil { + if err := docker.ConfigCleanup(); err != nil { fmt.Printf("Config cleanup failed. Which Failed due to %v \n ", err) } fmt.Printf("%v %v Sandbox cluster is removed successfully. \n", emoji.Broom, emoji.Broom) diff --git a/flytectl/cmd/sandbox/teardown_test.go b/flytectl/cmd/sandbox/teardown_test.go new file mode 100644 index 0000000000..01511d926e --- /dev/null +++ b/flytectl/cmd/sandbox/teardown_test.go @@ -0,0 +1,59 @@ +package sandbox + +import ( + "context" + "fmt" + "io" + "testing" + + cmdCore "github.com/flyteorg/flytectl/cmd/core" + + "github.com/docker/docker/api/types" + "github.com/flyteorg/flytectl/pkg/docker" + "github.com/flyteorg/flytectl/pkg/docker/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +var containers []types.Container + +func TestTearDownFunc(t *testing.T) { + container1 := types.Container{ + ID: "FlyteSandboxClusterName", + Names: []string{ + docker.FlyteSandboxClusterName, + }, + } + containers = append(containers, container1) + + t.Run("Success", func(t *testing.T) { + ctx := context.Background() + mockDocker := &mocks.Docker{} + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return(containers, nil) + mockDocker.OnContainerRemove(ctx, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(nil) + + err := tearDownSandbox(ctx, mockDocker) + assert.Nil(t, err) + }) + t.Run("Error", func(t *testing.T) { + ctx := context.Background() + mockDocker := &mocks.Docker{} + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return(containers, nil) + mockDocker.OnContainerRemove(ctx, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(fmt.Errorf("err")) + err := tearDownSandbox(ctx, mockDocker) + assert.NotNil(t, err) + }) + +} + +func TestTearDownClusterFunc(t *testing.T) { + mockOutStream := new(io.Writer) + ctx := context.Background() + cmdCtx := cmdCore.NewCommandContext(nil, *mockOutStream) + mockDocker := &mocks.Docker{} + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return(containers, nil) + mockDocker.OnContainerRemove(ctx, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(nil) + docker.Client = mockDocker + err := teardownSandboxCluster(ctx, []string{}, cmdCtx) + assert.Nil(t, err) +} diff --git a/flytectl/go.mod b/flytectl/go.mod index 728371c428..ad4c8f90cc 100644 --- a/flytectl/go.mod +++ b/flytectl/go.mod @@ -25,6 +25,7 @@ require ( github.com/moby/term v0.0.0-20201216013528-df9cb8a40635 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/opencontainers/image-spec v1.0.1 // indirect github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 github.com/sirupsen/logrus v1.8.0 github.com/spf13/cobra v1.1.3 diff --git a/flytectl/pkg/docker/docker.go b/flytectl/pkg/docker/docker.go new file mode 100644 index 0000000000..f33b3b5219 --- /dev/null +++ b/flytectl/pkg/docker/docker.go @@ -0,0 +1,28 @@ +package docker + +import ( + "context" + "io" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/client" + specs "github.com/opencontainers/image-spec/specs-go/v1" +) + +//go:generate mockery -all -case=underscore + +type Docker interface { + ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, platform *specs.Platform, containerName string) (container.ContainerCreateCreatedBody, error) + ContainerStart(ctx context.Context, containerID string, options types.ContainerStartOptions) error + ImagePull(ctx context.Context, refStr string, options types.ImagePullOptions) (io.ReadCloser, error) + ContainerWait(ctx context.Context, containerID string, condition container.WaitCondition) (<-chan container.ContainerWaitOKBody, <-chan error) + ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) + ContainerRemove(ctx context.Context, containerID string, options types.ContainerRemoveOptions) error + ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) +} + +type FlyteDocker struct { + *client.Client +} diff --git a/flytectl/pkg/docker/mocks/docker.go b/flytectl/pkg/docker/mocks/docker.go new file mode 100644 index 0000000000..917546fbee --- /dev/null +++ b/flytectl/pkg/docker/mocks/docker.go @@ -0,0 +1,293 @@ +// Code generated by mockery v1.0.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + container "github.com/docker/docker/api/types/container" + + io "io" + + mock "github.com/stretchr/testify/mock" + + network "github.com/docker/docker/api/types/network" + + types "github.com/docker/docker/api/types" + + v1 "github.com/opencontainers/image-spec/specs-go/v1" +) + +// Docker is an autogenerated mock type for the Docker type +type Docker struct { + mock.Mock +} + +type Docker_ContainerCreate struct { + *mock.Call +} + +func (_m Docker_ContainerCreate) Return(_a0 container.ContainerCreateCreatedBody, _a1 error) *Docker_ContainerCreate { + return &Docker_ContainerCreate{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *Docker) OnContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, platform *v1.Platform, containerName string) *Docker_ContainerCreate { + c := _m.On("ContainerCreate", ctx, config, hostConfig, networkingConfig, platform, containerName) + return &Docker_ContainerCreate{Call: c} +} + +func (_m *Docker) OnContainerCreateMatch(matchers ...interface{}) *Docker_ContainerCreate { + c := _m.On("ContainerCreate", matchers...) + return &Docker_ContainerCreate{Call: c} +} + +// ContainerCreate provides a mock function with given fields: ctx, config, hostConfig, networkingConfig, platform, containerName +func (_m *Docker) ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, platform *v1.Platform, containerName string) (container.ContainerCreateCreatedBody, error) { + ret := _m.Called(ctx, config, hostConfig, networkingConfig, platform, containerName) + + var r0 container.ContainerCreateCreatedBody + if rf, ok := ret.Get(0).(func(context.Context, *container.Config, *container.HostConfig, *network.NetworkingConfig, *v1.Platform, string) container.ContainerCreateCreatedBody); ok { + r0 = rf(ctx, config, hostConfig, networkingConfig, platform, containerName) + } else { + r0 = ret.Get(0).(container.ContainerCreateCreatedBody) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *container.Config, *container.HostConfig, *network.NetworkingConfig, *v1.Platform, string) error); ok { + r1 = rf(ctx, config, hostConfig, networkingConfig, platform, containerName) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type Docker_ContainerList struct { + *mock.Call +} + +func (_m Docker_ContainerList) Return(_a0 []types.Container, _a1 error) *Docker_ContainerList { + return &Docker_ContainerList{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *Docker) OnContainerList(ctx context.Context, options types.ContainerListOptions) *Docker_ContainerList { + c := _m.On("ContainerList", ctx, options) + return &Docker_ContainerList{Call: c} +} + +func (_m *Docker) OnContainerListMatch(matchers ...interface{}) *Docker_ContainerList { + c := _m.On("ContainerList", matchers...) + return &Docker_ContainerList{Call: c} +} + +// ContainerList provides a mock function with given fields: ctx, options +func (_m *Docker) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) { + ret := _m.Called(ctx, options) + + var r0 []types.Container + if rf, ok := ret.Get(0).(func(context.Context, types.ContainerListOptions) []types.Container); ok { + r0 = rf(ctx, options) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]types.Container) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, types.ContainerListOptions) error); ok { + r1 = rf(ctx, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type Docker_ContainerLogs struct { + *mock.Call +} + +func (_m Docker_ContainerLogs) Return(_a0 io.ReadCloser, _a1 error) *Docker_ContainerLogs { + return &Docker_ContainerLogs{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *Docker) OnContainerLogs(ctx context.Context, _a1 string, options types.ContainerLogsOptions) *Docker_ContainerLogs { + c := _m.On("ContainerLogs", ctx, _a1, options) + return &Docker_ContainerLogs{Call: c} +} + +func (_m *Docker) OnContainerLogsMatch(matchers ...interface{}) *Docker_ContainerLogs { + c := _m.On("ContainerLogs", matchers...) + return &Docker_ContainerLogs{Call: c} +} + +// ContainerLogs provides a mock function with given fields: ctx, _a1, options +func (_m *Docker) ContainerLogs(ctx context.Context, _a1 string, options types.ContainerLogsOptions) (io.ReadCloser, error) { + ret := _m.Called(ctx, _a1, options) + + var r0 io.ReadCloser + if rf, ok := ret.Get(0).(func(context.Context, string, types.ContainerLogsOptions) io.ReadCloser); ok { + r0 = rf(ctx, _a1, options) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(io.ReadCloser) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, types.ContainerLogsOptions) error); ok { + r1 = rf(ctx, _a1, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type Docker_ContainerRemove struct { + *mock.Call +} + +func (_m Docker_ContainerRemove) Return(_a0 error) *Docker_ContainerRemove { + return &Docker_ContainerRemove{Call: _m.Call.Return(_a0)} +} + +func (_m *Docker) OnContainerRemove(ctx context.Context, containerID string, options types.ContainerRemoveOptions) *Docker_ContainerRemove { + c := _m.On("ContainerRemove", ctx, containerID, options) + return &Docker_ContainerRemove{Call: c} +} + +func (_m *Docker) OnContainerRemoveMatch(matchers ...interface{}) *Docker_ContainerRemove { + c := _m.On("ContainerRemove", matchers...) + return &Docker_ContainerRemove{Call: c} +} + +// ContainerRemove provides a mock function with given fields: ctx, containerID, options +func (_m *Docker) ContainerRemove(ctx context.Context, containerID string, options types.ContainerRemoveOptions) error { + ret := _m.Called(ctx, containerID, options) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, types.ContainerRemoveOptions) error); ok { + r0 = rf(ctx, containerID, options) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type Docker_ContainerStart struct { + *mock.Call +} + +func (_m Docker_ContainerStart) Return(_a0 error) *Docker_ContainerStart { + return &Docker_ContainerStart{Call: _m.Call.Return(_a0)} +} + +func (_m *Docker) OnContainerStart(ctx context.Context, containerID string, options types.ContainerStartOptions) *Docker_ContainerStart { + c := _m.On("ContainerStart", ctx, containerID, options) + return &Docker_ContainerStart{Call: c} +} + +func (_m *Docker) OnContainerStartMatch(matchers ...interface{}) *Docker_ContainerStart { + c := _m.On("ContainerStart", matchers...) + return &Docker_ContainerStart{Call: c} +} + +// ContainerStart provides a mock function with given fields: ctx, containerID, options +func (_m *Docker) ContainerStart(ctx context.Context, containerID string, options types.ContainerStartOptions) error { + ret := _m.Called(ctx, containerID, options) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, types.ContainerStartOptions) error); ok { + r0 = rf(ctx, containerID, options) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type Docker_ContainerWait struct { + *mock.Call +} + +func (_m Docker_ContainerWait) Return(_a0 <-chan container.ContainerWaitOKBody, _a1 <-chan error) *Docker_ContainerWait { + return &Docker_ContainerWait{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *Docker) OnContainerWait(ctx context.Context, containerID string, condition container.WaitCondition) *Docker_ContainerWait { + c := _m.On("ContainerWait", ctx, containerID, condition) + return &Docker_ContainerWait{Call: c} +} + +func (_m *Docker) OnContainerWaitMatch(matchers ...interface{}) *Docker_ContainerWait { + c := _m.On("ContainerWait", matchers...) + return &Docker_ContainerWait{Call: c} +} + +// ContainerWait provides a mock function with given fields: ctx, containerID, condition +func (_m *Docker) ContainerWait(ctx context.Context, containerID string, condition container.WaitCondition) (<-chan container.ContainerWaitOKBody, <-chan error) { + ret := _m.Called(ctx, containerID, condition) + + var r0 <-chan container.ContainerWaitOKBody + if rf, ok := ret.Get(0).(func(context.Context, string, container.WaitCondition) <-chan container.ContainerWaitOKBody); ok { + r0 = rf(ctx, containerID, condition) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan container.ContainerWaitOKBody) + } + } + + var r1 <-chan error + if rf, ok := ret.Get(1).(func(context.Context, string, container.WaitCondition) <-chan error); ok { + r1 = rf(ctx, containerID, condition) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(<-chan error) + } + } + + return r0, r1 +} + +type Docker_ImagePull struct { + *mock.Call +} + +func (_m Docker_ImagePull) Return(_a0 io.ReadCloser, _a1 error) *Docker_ImagePull { + return &Docker_ImagePull{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *Docker) OnImagePull(ctx context.Context, refStr string, options types.ImagePullOptions) *Docker_ImagePull { + c := _m.On("ImagePull", ctx, refStr, options) + return &Docker_ImagePull{Call: c} +} + +func (_m *Docker) OnImagePullMatch(matchers ...interface{}) *Docker_ImagePull { + c := _m.On("ImagePull", matchers...) + return &Docker_ImagePull{Call: c} +} + +// ImagePull provides a mock function with given fields: ctx, refStr, options +func (_m *Docker) ImagePull(ctx context.Context, refStr string, options types.ImagePullOptions) (io.ReadCloser, error) { + ret := _m.Called(ctx, refStr, options) + + var r0 io.ReadCloser + if rf, ok := ret.Get(0).(func(context.Context, string, types.ImagePullOptions) io.ReadCloser); ok { + r0 = rf(ctx, refStr, options) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(io.ReadCloser) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, types.ImagePullOptions) error); ok { + r1 = rf(ctx, refStr, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/flytectl/pkg/docker/sandbox_util.go b/flytectl/pkg/docker/sandbox_util.go new file mode 100644 index 0000000000..4ce230581b --- /dev/null +++ b/flytectl/pkg/docker/sandbox_util.go @@ -0,0 +1,194 @@ +package docker + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "strings" + + "github.com/flyteorg/flytectl/pkg/util" + + "github.com/docker/docker/client" + + "github.com/enescakir/emoji" + + cmdUtil "github.com/flyteorg/flytectl/pkg/commandutils" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/mount" + "github.com/docker/go-connections/nat" + f "github.com/flyteorg/flytectl/pkg/filesystemutils" +) + +var ( + Kubeconfig = f.FilePathJoin(f.UserHomeDir(), ".flyte", "k3s", "k3s.yaml") + FlytectlConfig = f.FilePathJoin(f.UserHomeDir(), ".flyte", "config-sandbox.yaml") + SuccessMessage = "Flyte is ready! Flyte UI is available at http://localhost:30081/console" + ImageName = "cr.flyte.org/flyteorg/flyte-sandbox:dind" + FlyteSandboxClusterName = "flyte-sandbox" + Environment = []string{"SANDBOX=1", "KUBERNETES_API_PORT=30086", "FLYTE_HOST=localhost:30081", "FLYTE_AWS_ENDPOINT=http://localhost:30084"} + FlyteSnackDir = "/usr/src" + K3sDir = "/etc/rancher/" + Client Docker + Volumes = []mount.Mount{ + { + Type: mount.TypeBind, + Source: f.FilePathJoin(f.UserHomeDir(), ".flyte"), + Target: K3sDir, + }, + } +) + +// SetupFlyteDir will create .flyte dir if not exist +func SetupFlyteDir() error { + if err := os.MkdirAll(f.FilePathJoin(f.UserHomeDir(), ".flyte"), 0755); err != nil { + return err + } + return nil +} + +// GetFlyteSandboxConfig download the flyte sandbox config +func GetFlyteSandboxConfig() error { + response, err := util.GetRequest("https://raw.githubusercontent.com", "/flyteorg/flytectl/master/config.yaml") + if err != nil { + return err + } + + return util.WriteIntoFile(response, FlytectlConfig) +} + +// ConfigCleanup will remove the sandbox config from flyte dir +func ConfigCleanup() error { + err := os.Remove(FlytectlConfig) + if err != nil { + return err + } + err = os.RemoveAll(f.FilePathJoin(f.UserHomeDir(), ".flyte", "k3s")) + if err != nil { + return err + } + return nil +} + +// GetSandbox will return sandbox container if it exist +func GetSandbox(ctx context.Context, cli Docker) *types.Container { + containers, _ := cli.ContainerList(ctx, types.ContainerListOptions{ + All: true, + }) + for _, v := range containers { + if strings.Contains(v.Names[0], FlyteSandboxClusterName) { + return &v + } + } + return nil +} + +// RemoveSandbox will remove sandbox container if exist +func RemoveSandbox(ctx context.Context, cli Docker, reader io.Reader) error { + if c := GetSandbox(ctx, cli); c != nil { + if cmdUtil.AskForConfirmation("delete existing sandbox cluster", reader) { + err := cli.ContainerRemove(context.Background(), c.ID, types.ContainerRemoveOptions{ + Force: true, + }) + return err + } + return nil + } + return nil +} + +// GetSandboxPorts will return sandbox ports +func GetSandboxPorts() (map[nat.Port]struct{}, map[nat.Port][]nat.PortBinding, error) { + return nat.ParsePortSpecs([]string{ + "127.0.0.1:30086:30086", + "127.0.0.1:30081:30081", + "127.0.0.1:30082:30082", + "127.0.0.1:30084:30084", + }) +} + +// PullDockerImage will Pull docker image +func PullDockerImage(ctx context.Context, cli Docker, image string) error { + r, err := cli.ImagePull(ctx, image, types.ImagePullOptions{}) + if err != nil { + return err + } + _, err = io.Copy(os.Stdout, r) + return err +} + +//StartContainer will create and start docker container +func StartContainer(ctx context.Context, cli Docker, volumes []mount.Mount, exposedPorts map[nat.Port]struct{}, portBindings map[nat.Port][]nat.PortBinding, name, image string) (string, error) { + resp, err := cli.ContainerCreate(ctx, &container.Config{ + Env: Environment, + Image: image, + Tty: false, + ExposedPorts: exposedPorts, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: portBindings, + Privileged: true, + }, nil, + nil, name) + + if err != nil { + return "", err + } + + if err := cli.ContainerStart(context.Background(), resp.ID, types.ContainerStartOptions{}); err != nil { + return "", err + } + return resp.ID, nil +} + +// WatchError will return channel for watching errors of a container +func WatchError(ctx context.Context, cli Docker, id string) (<-chan container.ContainerWaitOKBody, <-chan error) { + return cli.ContainerWait(context.Background(), id, container.WaitConditionNotRunning) +} + +// ReadLogs will return io scanner for reading the logs of a container +func ReadLogs(ctx context.Context, cli Docker, id string) (*bufio.Scanner, error) { + reader, err := cli.ContainerLogs(context.Background(), id, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }) + if err != nil { + return nil, err + } + return bufio.NewScanner(reader), nil +} + +// WaitForSandbox will wait until it doesn't get success message +func WaitForSandbox(reader *bufio.Scanner, message string) bool { + for reader.Scan() { + if strings.Contains(reader.Text(), message) { + fmt.Printf("%v %v %v %v %v \n", emoji.ManTechnologist, message, emoji.Rocket, emoji.Rocket, emoji.PartyPopper) + fmt.Printf("Please visit https://github.com/flyteorg/flytesnacks for more example %v \n", emoji.Rocket) + fmt.Printf("Register all flytesnacks example by running 'flytectl register examples -d development -p flytesnacks' \n") + fmt.Printf("Add KUBECONFIG and FLYTECTL_CONFIG to your environment variable \n") + fmt.Printf("export KUBECONFIG=%v \n", Kubeconfig) + fmt.Printf("export FLYTECTL_CONFIG=%v \n", FlytectlConfig) + return true + } + fmt.Println(reader.Text()) + } + return false +} + +// GetDockerClient will returns the docker client +func GetDockerClient() (Docker, error) { + if Client == nil { + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + fmt.Printf("%v Please Check your docker client %v \n", emoji.GrimacingFace, emoji.Whale) + return nil, err + } + return cli, nil + } + return Client, nil +} diff --git a/flytectl/pkg/docker/sandbox_util_test.go b/flytectl/pkg/docker/sandbox_util_test.go new file mode 100644 index 0000000000..b39e03f4f6 --- /dev/null +++ b/flytectl/pkg/docker/sandbox_util_test.go @@ -0,0 +1,315 @@ +package docker + +import ( + "bufio" + "context" + "fmt" + + "github.com/docker/docker/api/types/container" + + //"github.com/docker/go-connections/nat" + "io/ioutil" + "os" + "strings" + "testing" + + "github.com/flyteorg/flytectl/pkg/docker/mocks" + "github.com/stretchr/testify/mock" + + "github.com/docker/docker/api/types" + cmdCore "github.com/flyteorg/flytectl/cmd/core" + u "github.com/flyteorg/flytectl/cmd/testutils" + + f "github.com/flyteorg/flytectl/pkg/filesystemutils" + + "github.com/stretchr/testify/assert" +) + +var ( + cmdCtx cmdCore.CommandContext + containers []types.Container +) + +func setupSandbox() { + mockAdminClient := u.MockClient + cmdCtx = cmdCore.NewCommandContext(mockAdminClient, u.MockOutStream) + _ = SetupFlyteDir() + container1 := types.Container{ + ID: "FlyteSandboxClusterName", + Names: []string{ + FlyteSandboxClusterName, + }, + } + containers = append(containers, container1) +} + +func TestConfigCleanup(t *testing.T) { + _, err := os.Stat(f.FilePathJoin(f.UserHomeDir(), ".flyte")) + if os.IsNotExist(err) { + _ = os.MkdirAll(f.FilePathJoin(f.UserHomeDir(), ".flyte"), 0755) + } + _ = ioutil.WriteFile(FlytectlConfig, []byte("string"), 0600) + _ = ioutil.WriteFile(Kubeconfig, []byte("string"), 0600) + + err = ConfigCleanup() + assert.Nil(t, err) + + _, err = os.Stat(FlytectlConfig) + check := os.IsNotExist(err) + assert.Equal(t, check, true) + + _, err = os.Stat(Kubeconfig) + check = os.IsNotExist(err) + assert.Equal(t, check, true) + _ = ConfigCleanup() +} + +func TestSetupFlytectlConfig(t *testing.T) { + _, err := os.Stat(f.FilePathJoin(f.UserHomeDir(), ".flyte")) + if os.IsNotExist(err) { + _ = os.MkdirAll(f.FilePathJoin(f.UserHomeDir(), ".flyte"), 0755) + } + err = SetupFlyteDir() + assert.Nil(t, err) + err = GetFlyteSandboxConfig() + assert.Nil(t, err) + _, err = os.Stat(FlytectlConfig) + assert.Nil(t, err) + check := os.IsNotExist(err) + assert.Equal(t, check, false) + _ = ConfigCleanup() +} + +func TestGetSandbox(t *testing.T) { + setupSandbox() + t.Run("Successfully get sandbox container", func(t *testing.T) { + mockDocker := &mocks.Docker{} + context := context.Background() + + mockDocker.OnContainerList(context, types.ContainerListOptions{All: true}).Return(containers, nil) + c := GetSandbox(context, mockDocker) + assert.Equal(t, c.Names[0], FlyteSandboxClusterName) + }) + + t.Run("Successfully get sandbox container with zero result", func(t *testing.T) { + mockDocker := &mocks.Docker{} + context := context.Background() + + mockDocker.OnContainerList(context, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + c := GetSandbox(context, mockDocker) + assert.Nil(t, c) + }) + + t.Run("Error in get sandbox container", func(t *testing.T) { + mockDocker := &mocks.Docker{} + context := context.Background() + + mockDocker.OnContainerList(context, types.ContainerListOptions{All: true}).Return(containers, nil) + mockDocker.OnContainerRemove(context, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(nil) + err := RemoveSandbox(context, mockDocker, strings.NewReader("y")) + assert.Nil(t, err) + }) + +} + +func TestRemoveSandboxWithNoReply(t *testing.T) { + setupSandbox() + t.Run("Successfully remove sandbox container", func(t *testing.T) { + mockDocker := &mocks.Docker{} + context := context.Background() + + // Verify the attributes + mockDocker.OnContainerList(context, types.ContainerListOptions{All: true}).Return(containers, nil) + mockDocker.OnContainerRemove(context, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(nil) + err := RemoveSandbox(context, mockDocker, strings.NewReader("n")) + assert.Nil(t, err) + }) + + t.Run("Successfully remove sandbox container with zero sandbox containers are running", func(t *testing.T) { + mockDocker := &mocks.Docker{} + context := context.Background() + + // Verify the attributes + mockDocker.OnContainerList(context, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnContainerRemove(context, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(nil) + err := RemoveSandbox(context, mockDocker, strings.NewReader("n")) + assert.Nil(t, err) + }) + +} + +func TestPullDockerImage(t *testing.T) { + t.Run("Successfully pull image", func(t *testing.T) { + setupSandbox() + mockDocker := &mocks.Docker{} + context := context.Background() + // Verify the attributes + mockDocker.OnImagePullMatch(context, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + err := PullDockerImage(context, mockDocker, "nginx") + assert.Nil(t, err) + }) + + t.Run("Error in pull image", func(t *testing.T) { + setupSandbox() + mockDocker := &mocks.Docker{} + context := context.Background() + // Verify the attributes + mockDocker.OnImagePullMatch(context, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, fmt.Errorf("error")) + err := PullDockerImage(context, mockDocker, "nginx") + assert.NotNil(t, err) + }) + +} + +func TestStartContainer(t *testing.T) { + p1, p2, _ := GetSandboxPorts() + + t.Run("Successfully create a container", func(t *testing.T) { + setupSandbox() + mockDocker := &mocks.Docker{} + context := context.Background() + + // Verify the attributes + mockDocker.OnContainerCreate(context, &container.Config{ + Env: Environment, + Image: ImageName, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: Volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(context, "Hello", types.ContainerStartOptions{}).Return(nil) + id, err := StartContainer(context, mockDocker, Volumes, p1, p2, "nginx", ImageName) + assert.Nil(t, err) + assert.Greater(t, len(id), 0) + assert.Equal(t, id, "Hello") + }) + + t.Run("Error in creating container", func(t *testing.T) { + setupSandbox() + mockDocker := &mocks.Docker{} + context := context.Background() + + // Verify the attributes + mockDocker.OnContainerCreate(context, &container.Config{ + Env: Environment, + Image: ImageName, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: Volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "", + }, fmt.Errorf("error")) + mockDocker.OnContainerStart(context, "Hello", types.ContainerStartOptions{}).Return(nil) + id, err := StartContainer(context, mockDocker, Volumes, p1, p2, "nginx", ImageName) + assert.NotNil(t, err) + assert.Equal(t, len(id), 0) + assert.Equal(t, id, "") + }) + + t.Run("Error in start of a container", func(t *testing.T) { + setupSandbox() + mockDocker := &mocks.Docker{} + context := context.Background() + + // Verify the attributes + mockDocker.OnContainerCreate(context, &container.Config{ + Env: Environment, + Image: ImageName, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: Volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(context, "Hello", types.ContainerStartOptions{}).Return(fmt.Errorf("error")) + id, err := StartContainer(context, mockDocker, Volumes, p1, p2, "nginx", ImageName) + assert.NotNil(t, err) + assert.Equal(t, len(id), 0) + assert.Equal(t, id, "") + }) +} + +func TestWatchError(t *testing.T) { + setupSandbox() + mockDocker := &mocks.Docker{} + context := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker.OnContainerWaitMatch(context, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err := WatchError(context, mockDocker, "test") + assert.NotNil(t, err) +} + +func TestReadLogs(t *testing.T) { + setupSandbox() + + t.Run("Successfully read logs", func(t *testing.T) { + mockDocker := &mocks.Docker{} + context := context.Background() + mockDocker.OnContainerLogsMatch(context, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + _, err := ReadLogs(context, mockDocker, "test") + assert.Nil(t, err) + }) + + t.Run("Error in reading logs", func(t *testing.T) { + mockDocker := &mocks.Docker{} + context := context.Background() + mockDocker.OnContainerLogsMatch(context, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, fmt.Errorf("error")) + _, err := ReadLogs(context, mockDocker, "test") + assert.NotNil(t, err) + }) +} + +func TestWaitForSandbox(t *testing.T) { + setupSandbox() + t.Run("Successfully read logs ", func(t *testing.T) { + reader := bufio.NewScanner(strings.NewReader("hello \n Flyte")) + + check := WaitForSandbox(reader, "Flyte") + assert.Equal(t, true, check) + }) + + t.Run("Error in reading logs ", func(t *testing.T) { + reader := bufio.NewScanner(strings.NewReader("")) + check := WaitForSandbox(reader, "Flyte") + assert.Equal(t, false, check) + }) +} + +func TestDockerClient(t *testing.T) { + t.Run("Successfully get docker mock client", func(t *testing.T) { + mockDocker := &mocks.Docker{} + Client = mockDocker + cli, err := GetDockerClient() + assert.Nil(t, err) + assert.NotNil(t, cli) + }) + t.Run("Successfully get docker client", func(t *testing.T) { + Client = nil + cli, err := GetDockerClient() + assert.Nil(t, err) + assert.NotNil(t, cli) + }) + +}