Skip to content

Commit

Permalink
Add upgrade command with some refactor (flyteorg#152)
Browse files Browse the repository at this point in the history
* Added k8s check in sandbox
* Added upgrade command

Signed-off-by: Yuvraj <[email protected]>
  • Loading branch information
yindia authored and robert-ulbrich-mercedes-benz committed Jul 2, 2024
1 parent cab9995 commit 6d0c286
Show file tree
Hide file tree
Showing 25 changed files with 1,284 additions and 222 deletions.
7 changes: 6 additions & 1 deletion flytectl/.github/workflows/sandbox.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ jobs:
key: ${{ runner.os }}-go-${{ hashFiles('go.sum') }}
- name: Build Flytectl binary
run: make compile
- name: Setup env
run: |
mkdir -p ~/.flyte/k3s && touch ~/.flyte/k3s/k3s.yaml && chmod 666 ~/.flyte/k3s/k3s.yaml
- name: Create a sandbox cluster
run: bin/flytectl sandbox start
- name: Setup flytectl config
run: bin/flytectl config init
- name: Register cookbook
run: bin/flytectl register examples -d development -p flytesnacks || true
run: bin/flytectl register examples -d development -p flytesnacks
- name: Teardown Sandbox cluster
run: bin/flytectl sandbox teardown
9 changes: 7 additions & 2 deletions flytectl/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/flyteorg/flytectl/cmd/get"
"github.com/flyteorg/flytectl/cmd/register"
"github.com/flyteorg/flytectl/cmd/update"
"github.com/flyteorg/flytectl/cmd/upgrade"
"github.com/flyteorg/flytectl/cmd/version"
"github.com/flyteorg/flytectl/pkg/printer"
stdConfig "github.com/flyteorg/flytestdlib/config"
Expand Down Expand Up @@ -66,8 +67,12 @@ func newRootCmd() *cobra.Command {
rootCmd.AddCommand(configuration.CreateConfigCommand())
rootCmd.AddCommand(completionCmd)
// Added version command
versioncmd := version.GetVersionCommand(rootCmd)
cmdCore.AddCommands(rootCmd, versioncmd)
versionCmd := version.GetVersionCommand(rootCmd)
cmdCore.AddCommands(rootCmd, versionCmd)

// Added upgrade command
upgradeCmd := upgrade.SelfUpgrade(rootCmd)
cmdCore.AddCommands(rootCmd, upgradeCmd)

config.GetConfig()

Expand Down
4 changes: 2 additions & 2 deletions flytectl/cmd/sandbox/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ func sandboxClusterExec(ctx context.Context, args []string, cmdCtx cmdCore.Comma
return err
}
if len(args) > 0 {
return Execute(ctx, cli, args)
return execute(ctx, cli, args)
}
return fmt.Errorf("missing argument. Please check usage examples by running flytectl sandbox exec --help")
}

func Execute(ctx context.Context, cli docker.Docker, args []string) error {
func execute(ctx context.Context, cli docker.Docker, args []string) error {
c := docker.GetSandbox(ctx, cli)
if c != nil {
exec, err := docker.ExecCommend(ctx, cli, c.ID, args)
Expand Down
182 changes: 132 additions & 50 deletions flytectl/cmd/sandbox/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,29 @@ import (
"io"
"os"
"path/filepath"
"strings"
"time"

"github.com/flyteorg/flytectl/clierrors"

"github.com/docker/docker/api/types/mount"
"github.com/avast/retry-go"
"github.com/olekukonko/tablewriter"
corev1api "k8s.io/api/core/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/flyteorg/flytectl/pkg/configutil"
"github.com/flyteorg/flytectl/pkg/util/githubutil"

f "github.com/flyteorg/flytectl/pkg/filesystemutils"
"github.com/flyteorg/flytectl/pkg/util"
"github.com/flyteorg/flytestdlib/logger"

"github.com/flyteorg/flytectl/pkg/docker"
"github.com/docker/docker/api/types/mount"
"github.com/flyteorg/flytectl/clierrors"
"github.com/flyteorg/flytectl/pkg/configutil"
"github.com/flyteorg/flytectl/pkg/k8s"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/enescakir/emoji"
sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox"
cmdCore "github.com/flyteorg/flytectl/cmd/core"
"github.com/flyteorg/flytectl/pkg/docker"
f "github.com/flyteorg/flytectl/pkg/filesystemutils"
"github.com/flyteorg/flytectl/pkg/util"
)

const (
Expand All @@ -47,14 +54,16 @@ Run specific version of flyte, Only available after v0.14.0+
Usage
`
GeneratedManifest = "/flyteorg/share/flyte_generated.yaml"
FlyteReleaseURL = "/flyteorg/flyte/releases/download/%v/flyte_sandbox_manifest.yaml"
FlyteMinimumVersionSupported = "v0.14.0"
GithubURL = "https://github.com"
k8sEndpoint = "https://127.0.0.1:30086"
flyteMinimumVersionSupported = "v0.14.0"
generatedManifest = "/flyteorg/share/flyte_generated.yaml"
flyteNamespace = "flyte"
diskPressureTaint = "node.kubernetes.io/disk-pressure"
taintEffect = "NoSchedule"
)

var (
FlyteManifest = f.FilePathJoin(f.UserHomeDir(), ".flyte", "flyte_generated.yaml")
flyteManifest = f.FilePathJoin(f.UserHomeDir(), ".flyte", "flyte_generated.yaml")
)

type ExecResult struct {
Expand All @@ -76,6 +85,24 @@ func startSandboxCluster(ctx context.Context, args []string, cmdCtx cmdCore.Comm
if reader != nil {
docker.WaitForSandbox(reader, docker.SuccessMessage)
}

var k8sClient k8s.K8s
err = retry.Do(
func() error {
k8sClient, err = k8s.GetK8sClient(docker.Kubeconfig, k8sEndpoint)
return err
},
retry.Attempts(10),
)
if err != nil {
return err
}

if err := watchFlyteDeployment(ctx, k8sClient.CoreV1()); err != nil {
return err
}

util.PrintSandboxMessage()
return nil
}

Expand All @@ -86,7 +113,8 @@ func startSandbox(ctx context.Context, cli docker.Docker, reader io.Reader) (*bu
if err.Error() != clierrors.ErrSandboxExists {
return nil, err
}
printExistingSandboxMessage()
fmt.Printf("Existing details of your sandbox:")
util.PrintSandboxMessage()
return nil, nil
}

Expand All @@ -110,14 +138,24 @@ func startSandbox(ctx context.Context, cli docker.Docker, reader io.Reader) (*bu
}

if len(sandboxConfig.DefaultConfig.Version) > 0 {
if err := downloadFlyteManifest(sandboxConfig.DefaultConfig.Version); err != nil {
isGreater, err := util.IsVersionGreaterThan(sandboxConfig.DefaultConfig.Version, flyteMinimumVersionSupported)
if err != nil {
return nil, err
}
vol, err := mountVolume(FlyteManifest, GeneratedManifest)
if err != nil {
if !isGreater {
logger.Infof(ctx, "version flag only supported after with flyte %s+ release", flyteMinimumVersionSupported)
return nil, fmt.Errorf("version flag only supported after with flyte %s+ release", flyteMinimumVersionSupported)
}
if err := githubutil.GetFlyteManifest(sandboxConfig.DefaultConfig.Version, flyteManifest); err != nil {
return nil, err
}
volumes = append(volumes, *vol)

if vol, err := mountVolume(flyteManifest, generatedManifest); err != nil {
return nil, err
} else if vol != nil {
volumes = append(volumes, *vol)
}

}

fmt.Printf("%v pulling docker image %s\n", emoji.Whale, docker.ImageName)
Expand All @@ -133,18 +171,10 @@ func startSandbox(ctx context.Context, cli docker.Docker, reader io.Reader) (*bu
return nil, err
}

_, errCh := docker.WatchError(ctx, cli, ID)
logReader, err := docker.ReadLogs(ctx, cli, ID)
if err != nil {
return nil, err
}
go func() {
err := <-errCh
if err != nil {
fmt.Printf("err: %v", err)
os.Exit(1)
}
}()

return logReader, nil
}
Expand All @@ -164,34 +194,86 @@ func mountVolume(file, destination string) (*mount.Mount, error) {
return nil, nil
}

func downloadFlyteManifest(version string) error {
isGreater, err := util.IsVersionGreaterThan(version, FlyteMinimumVersionSupported)
if err != nil {
return err
func watchFlyteDeployment(ctx context.Context, appsClient corev1.CoreV1Interface) error {
var data = os.Stdout
table := tablewriter.NewWriter(data)
table.SetHeader([]string{"Service", "Status", "Namespace"})
table.SetRowLine(true)

for {
isTaint, err := isNodeTainted(ctx, appsClient)
if err != nil {
return err
}
if isTaint {
return fmt.Errorf("docker sandbox doesn't have sufficient memory available. Please run docker system prune -a --volumes")
}

pods, err := getFlyteDeployment(ctx, appsClient)
if err != nil {
return err
}
table.ClearRows()
table.SetAutoWrapText(false)
table.SetAutoFormatHeaders(true)

// Clear os.Stdout
_, _ = data.WriteString("\x1b[3;J\x1b[H\x1b[2J")

var total, ready int
total = len(pods.Items)
ready = 0
if total != 0 {
for _, v := range pods.Items {
if isPodReady(v) {
ready++
}
if len(v.Status.Conditions) > 0 {
table.Append([]string{v.GetName(), string(v.Status.Phase), v.GetNamespace()})
}
}
table.Render()
if total == ready {
break
}
}

time.Sleep(40 * time.Second)
}
if !isGreater {
return fmt.Errorf("version flag only support %s+ flyte version", FlyteMinimumVersionSupported)

return nil
}

func isPodReady(v corev1api.Pod) bool {
if (v.Status.Phase == corev1api.PodRunning) || (v.Status.Phase == corev1api.PodSucceeded) {
return true
}
response, err := util.GetRequest(GithubURL, fmt.Sprintf(FlyteReleaseURL, version))
return false
}

func getFlyteDeployment(ctx context.Context, client corev1.CoreV1Interface) (*corev1api.PodList, error) {
pods, err := client.Pods(flyteNamespace).List(ctx, v1.ListOptions{})
if err != nil {
return err
}
if err := util.WriteIntoFile(response, FlyteManifest); err != nil {
return err
return nil, err
}
return nil
return pods, nil
}

func printExistingSandboxMessage() {
kubeconfig := strings.Join([]string{
"$KUBECONFIG",
f.FilePathJoin(f.UserHomeDir(), ".kube", "config"),
docker.Kubeconfig,
}, ":")

fmt.Printf("Existing details of your sandbox:")
fmt.Printf("%v %v %v %v %v \n", emoji.ManTechnologist, docker.SuccessMessage, emoji.Rocket, emoji.Rocket, emoji.PartyPopper)
fmt.Printf("Add KUBECONFIG and FLYTECTL_CONFIG to your environment variable \n")
fmt.Printf("export KUBECONFIG=%v \n", kubeconfig)
fmt.Printf("export FLYTECTL_CONFIG=%v \n", configutil.FlytectlConfig)
func isNodeTainted(ctx context.Context, client corev1.CoreV1Interface) (bool, error) {
nodes, err := client.Nodes().List(ctx, v1.ListOptions{})
if err != nil {
return false, err
}
match := 0
for _, node := range nodes.Items {
for _, c := range node.Spec.Taints {
if c.Key == diskPressureTaint && c.Effect == taintEffect {
match++
}
}
}
if match > 0 {
return true, nil
}
return false, nil
}
Loading

0 comments on commit 6d0c286

Please sign in to comment.