Skip to content

Commit

Permalink
Improved CLI for get (flyteorg#32)
Browse files Browse the repository at this point in the history
* Improved CLI for get

 - Get now returns a summary table
 - Summary table also optionally shows resource quota usage (as
a percentage)

* updated Goimports

* remove unused metric
  • Loading branch information
Ketan Umare authored Dec 4, 2019
1 parent 8291add commit a71a62c
Showing 1 changed file with 150 additions and 6 deletions.
156 changes: 150 additions & 6 deletions cmd/kubectl-flyte/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@ package cmd

import (
"fmt"
"sort"
"strings"

gotree "github.com/DiSiqueira/GoTree"
"github.com/lyft/flytepropeller/cmd/kubectl-flyte/cmd/printers"
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/spf13/cobra"
v12 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/lyft/flytepropeller/cmd/kubectl-flyte/cmd/printers"
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

type GetOpts struct {
*RootOptions
detailsEnabledFlag bool
limit int64
chunkSize int64
showQuota bool
}

func NewGetCommand(opts *RootOptions) *cobra.Command {
Expand All @@ -38,6 +42,7 @@ func NewGetCommand(opts *RootOptions) *cobra.Command {
}

getCmd.Flags().BoolVarP(&getOpts.detailsEnabledFlag, "details", "d", false, "If details of node execs are desired.")
getCmd.Flags().BoolVarP(&getOpts.showQuota, "show-quota", "q", false, "Shows resource quota usage for that resource.")
getCmd.Flags().Int64VarP(&getOpts.chunkSize, "chunk-size", "c", 100, "Use this much batch size.")
getCmd.Flags().Int64VarP(&getOpts.limit, "limit", "l", -1, "Only get limit records. -1 => all records.")

Expand Down Expand Up @@ -97,6 +102,101 @@ func (g *GetOpts) iterateOverWorkflows(f func(*v1alpha1.FlyteWorkflow) error, ba
}
}

func (g *GetOpts) iterateOverQuotas(f func(quota *v12.ResourceQuota) error, batchSize int64, limit int64) error {
if limit > 0 && limit < batchSize {
batchSize = limit
}
t, err := g.GetTimeoutSeconds()
if err != nil {
return err
}
opts := v1.ListOptions{
Limit: batchSize,
TimeoutSeconds: &t,
}

var counter int64
for {
rq, err := g.kubeClient.CoreV1().ResourceQuotas(g.ConfigOverrides.Context.Namespace).List(opts)
if err != nil {
return err
}
for _, r := range rq.Items {
if err := f(&r); err != nil {
return err
}
counter++
if counter == limit {
return nil
}
}
if rq.Continue == "" {
return nil
}
opts.Continue = rq.Continue
}
}

type perNSCounter struct {
ns string
total int
succeeded int
failed int
waiting int
running int
hard v12.ResourceList
used v12.ResourceList
}

func Header() string {
return fmt.Sprintf("|%40s|%7s|%7s|%7s|%7s|%7s|%40s|", "Namespace", "Total", "Success", "Failed", "Running", "Waiting", "QuotasUsage")
}

func (v perNSCounter) CalculateQuotaString() string {
sb := strings.Builder{}
for k, q := range v.hard {
uq := v.used[k]
used, ok := uq.AsInt64()
if !ok {
continue
}
hard, ok := q.AsInt64()
if !ok {
continue
}
per := float64(used) / float64(hard) * 100.0
sb.WriteString(fmt.Sprintf("%s=%.2f%%,", k, per))
}
return sb.String()
}

func (v perNSCounter) String(quotas bool) string {
s := "-"
if quotas {
s = v.CalculateQuotaString()
}
return fmt.Sprintf("|%40s|%7d|%7d|%7d|%7d|%7d|%40s|", v.ns, v.total, v.succeeded, v.failed, v.running, v.waiting, s)
}

type SortableNSDist []*perNSCounter

func (s SortableNSDist) Len() int {
return len(s)
}

func (s SortableNSDist) Less(i, j int) bool {
if s[i].total == s[j].total {
return s[i].running < s[j].running
}
return s[i].total < s[j].total
}

func (s SortableNSDist) Swap(i, j int) {
w := s[i]
s[i] = s[j]
s[j] = w
}

func (g *GetOpts) listWorkflows() error {
fmt.Printf("Listing workflows in [%s]\n", g.ConfigOverrides.Context.Namespace)
wp := printers.WorkflowPrinter{}
Expand All @@ -106,25 +206,39 @@ func (g *GetOpts) listWorkflows() error {
var failed = 0
var running = 0
var waiting = 0
perNS := make(map[string]*perNSCounter)
err := g.iterateOverWorkflows(
func(w *v1alpha1.FlyteWorkflow) error {
counter++
if err := wp.PrintShort(workflows, w); err != nil {
return err
}
if _, ok := perNS[w.Namespace]; !ok {
perNS[w.Namespace] = &perNSCounter{
ns: w.Namespace,
}
}
c := perNS[w.Namespace]
c.total++
switch w.GetExecutionStatus().GetPhase() {
case v1alpha1.WorkflowPhaseReady:
c.waiting++
waiting++
case v1alpha1.WorkflowPhaseSuccess:
c.succeeded++
succeeded++
case v1alpha1.WorkflowPhaseFailed:
c.failed++
failed++
default:
running++
c.running++
}
if counter%g.chunkSize == 0 {
fmt.Println("")
fmt.Print(workflows.Print())
if g.detailsEnabledFlag {
fmt.Println("")
fmt.Print(workflows.Print())
}
workflows = gotree.New("\nworkflows")
} else {
fmt.Print(".")
Expand All @@ -135,8 +249,38 @@ func (g *GetOpts) listWorkflows() error {
return err
}

fmt.Print(workflows.Print())
fmt.Printf("Found %d workflows\n", counter)
if g.showQuota {
err := g.iterateOverQuotas(func(q *v12.ResourceQuota) error {
if _, ok := perNS[q.Namespace]; !ok {
perNS[q.Namespace] = &perNSCounter{
ns: q.Namespace,
}
}
c := perNS[q.Namespace]
c.hard = q.Status.Hard
c.used = q.Status.Used
return nil
}, g.chunkSize, g.limit)
if err != nil {
return err
}
}

if g.detailsEnabledFlag {
fmt.Print(workflows.Print())
}
fmt.Printf("\nFound %d workflows\n", counter)
fmt.Printf("Success: %d, Failed: %d, Running: %d, Waiting: %d\n", succeeded, failed, running, waiting)

perNSDist := make(SortableNSDist, 0, len(perNS))
for _, v := range perNS {
perNSDist = append(perNSDist, v)
}
sort.Sort(perNSDist)

fmt.Println(Header())
for _, v := range perNSDist {
fmt.Println(v.String(g.showQuota))
}
return nil
}

0 comments on commit a71a62c

Please sign in to comment.