Skip to content

Commit

Permalink
Filtering on activation state of workflows (flyteorg#305)
Browse files Browse the repository at this point in the history
* Added named entity fetch on workflow which also support state filter

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Fixed docs

Signed-off-by: Prafulla Mahindrakar <[email protected]>
  • Loading branch information
pmahindrakar-oss authored Apr 8, 2022
1 parent 58f2ab7 commit a4702fb
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 24 deletions.
27 changes: 20 additions & 7 deletions flytectl/cmd/get/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Retrieve all the workflows within project and domain (workflow/workflows can be
flytectl get workflow -p flytesnacks -d development
Retrieve workflow by name within project and domain:
Retrieve all versions of a workflow by name within project and domain:
::
Expand Down Expand Up @@ -103,6 +103,14 @@ var listWorkflowColumns = []printer.Column{
{Header: "Created At", JSONPath: "$.closure.createdAt"},
}

var namedEntityColumns = []printer.Column{
{Header: "Project", JSONPath: "$.id.project"},
{Header: "Domain", JSONPath: "$.id.domain"},
{Header: "Name", JSONPath: "$.id.name"},
{Header: "Description", JSONPath: "$.metadata.description"},
{Header: "State", JSONPath: "$.metadata.state"},
}

func WorkflowToProtoMessages(l []*admin.Workflow) []proto.Message {
messages := make([]proto.Message, 0, len(l))
for _, m := range l {
Expand All @@ -111,6 +119,14 @@ func WorkflowToProtoMessages(l []*admin.Workflow) []proto.Message {
return messages
}

func NamedEntityToProtoMessages(l []*admin.NamedEntity) []proto.Message {
messages := make([]proto.Message, 0, len(l))
for _, m := range l {
messages = append(messages, m)
}
return messages
}

func WorkflowToTableProtoMessages(l []*admin.Workflow) []proto.Message {
messages := make([]proto.Message, 0, len(l))
for _, m := range l {
Expand Down Expand Up @@ -155,16 +171,13 @@ func getWorkflowFunc(ctx context.Context, args []string, cmdCtx cmdCore.CommandC
return adminPrinter.Print(config.GetConfig().MustOutputFormat(), columns, WorkflowToProtoMessages(workflows)...)
}

workflows, err = cmdCtx.AdminFetcherExt().FetchAllVerOfWorkflow(ctx, "", config.GetConfig().Project, config.GetConfig().Domain, workflowconfig.DefaultConfig.Filter)
nameEntities, err := cmdCtx.AdminFetcherExt().FetchAllWorkflows(ctx, config.GetConfig().Project, config.GetConfig().Domain, workflowconfig.DefaultConfig.Filter)
if err != nil {
return err
}

logger.Debugf(ctx, "Retrieved %v workflows", len(workflows))
if config.GetConfig().MustOutputFormat() == printer.OutputFormatTABLE {
return adminPrinter.Print(config.GetConfig().MustOutputFormat(), listWorkflowColumns, WorkflowToTableProtoMessages(workflows)...)
}
return adminPrinter.Print(config.GetConfig().MustOutputFormat(), listWorkflowColumns, WorkflowToProtoMessages(workflows)...)
logger.Debugf(ctx, "Retrieved %v workflows", len(nameEntities))
return adminPrinter.Print(config.GetConfig().MustOutputFormat(), namedEntityColumns, NamedEntityToProtoMessages(nameEntities)...)
}

// FetchWorkflowForName fetches the workflow give it name.
Expand Down
37 changes: 23 additions & 14 deletions flytectl/cmd/get/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,12 @@ import (
)

var (
resourceListRequestWorkflow *admin.ResourceListRequest
workflowListResponse *admin.WorkflowList
argsWf []string
workflow1 *admin.Workflow
workflows []*admin.Workflow
argsWf []string
workflow1 *admin.Workflow
workflows []*admin.Workflow
)

func getWorkflowSetup() {
resourceListRequestWorkflow = &admin.ResourceListRequest{
Id: &admin.NamedEntityIdentifier{
Project: projectValue,
Domain: domainValue,
},
}

variableMap := map[string]*core.Variable{
"var1": {
Expand Down Expand Up @@ -97,9 +89,6 @@ func getWorkflowSetup() {
},
}
workflows = []*admin.Workflow{workflow1, workflow2}
workflowListResponse = &admin.WorkflowList{
Workflows: workflows,
}
argsWf = []string{"workflow1"}
workflow.DefaultConfig.Latest = false
workflow.DefaultConfig.Version = ""
Expand Down Expand Up @@ -149,6 +138,26 @@ func TestGetWorkflowFuncWithError(t *testing.T) {
assert.NotNil(t, err)
})

t.Run("fetching all workflow success", func(t *testing.T) {
s := setup()
getWorkflowSetup()
var args []string
s.FetcherExt.OnFetchAllWorkflowsMatch(mock.Anything, mock.Anything,
mock.Anything, mock.Anything).Return([]*admin.NamedEntity{}, nil)
err := getWorkflowFunc(s.Ctx, args, s.CmdCtx)
assert.Nil(t, err)
})

t.Run("fetching all workflow error", func(t *testing.T) {
s := setup()
getWorkflowSetup()
var args []string
s.FetcherExt.OnFetchAllWorkflowsMatch(mock.Anything, mock.Anything,
mock.Anything, mock.Anything).Return(nil, fmt.Errorf("error fetching all workflows"))
err := getWorkflowFunc(s.Ctx, args, s.CmdCtx)
assert.NotNil(t, err)
})

}

func TestGetWorkflowFuncLatestWithTable(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions flytectl/pkg/ext/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type AdminFetcherExtInterface interface {
// FetchTaskVersion fetches particular version of task in a project, domain
FetchTaskVersion(ctx context.Context, name, version, project, domain string) (*admin.Task, error)

// FetchAllWorkflows fetches all workflows in project domain
FetchAllWorkflows(ctx context.Context, project, domain string, filter filters.Filters) ([]*admin.NamedEntity, error)

// FetchAllVerOfWorkflow fetches all versions of task in a project, domain
FetchAllVerOfWorkflow(ctx context.Context, name, project, domain string, filter filters.Filters) ([]*admin.Workflow, error)

Expand Down
41 changes: 41 additions & 0 deletions flytectl/pkg/ext/mocks/admin_fetcher_ext_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions flytectl/pkg/ext/workflow_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@ func (a *AdminFetcherExtClient) FetchAllVerOfWorkflow(ctx context.Context, workf
return wList.Workflows, nil
}

// FetchAllWorkflows fetches all workflows in project domain
func (a *AdminFetcherExtClient) FetchAllWorkflows(ctx context.Context, project, domain string, filter filters.Filters) ([]*admin.NamedEntity, error) {
tranformFilters, err := filters.BuildNamedEntityListRequest(filter, project, domain, core.ResourceType_WORKFLOW)
if err != nil {
return nil, err
}
wList, err := a.AdminServiceClient().ListNamedEntities(ctx, tranformFilters)
if err != nil {
return nil, err
}
if len(wList.Entities) == 0 {
return nil, fmt.Errorf("no workflow retrieved for %v project %v domain", project, domain)
}
return wList.Entities, nil
}

// FetchWorkflowLatestVersion fetches latest version for given workflow name
func (a *AdminFetcherExtClient) FetchWorkflowLatestVersion(ctx context.Context, name, project, domain string, filter filters.Filters) (*admin.Workflow, error) {
// Fetch the latest version of the workflow.
Expand Down
42 changes: 39 additions & 3 deletions flytectl/pkg/ext/workflow_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (
)

var (
workflowListResponse *admin.WorkflowList
workflowFilter = filters.Filters{}
workflowResponse *admin.Workflow
workflowListResponse *admin.WorkflowList
namedEntityListResponse *admin.NamedEntityList
workflowFilter = filters.Filters{}
workflowResponse *admin.Workflow
)

func getWorkflowFetcherSetup() {
Expand Down Expand Up @@ -79,14 +80,49 @@ func getWorkflowFetcherSetup() {
},
}

namedEntity := &admin.NamedEntity{
Id: &admin.NamedEntityIdentifier{
Project: "project",
Domain: "domain",
Name: "workflow",
},
ResourceType: core.ResourceType_WORKFLOW,
}

workflows := []*admin.Workflow{workflow2, workflow1}

namedEntityListResponse = &admin.NamedEntityList{
Entities: []*admin.NamedEntity{namedEntity},
}
workflowListResponse = &admin.WorkflowList{
Workflows: workflows,
}
workflowResponse = workflows[0]
}

func TestFetchAllWorkflows(t *testing.T) {
t.Run("non empty response", func(t *testing.T) {
getWorkflowFetcherSetup()
adminClient.OnListNamedEntitiesMatch(mock.Anything, mock.Anything).Return(namedEntityListResponse, nil)
_, err := adminFetcherExt.FetchAllWorkflows(ctx, "project", "domain", workflowFilter)
assert.Nil(t, err)
})
t.Run("empty response", func(t *testing.T) {
getWorkflowFetcherSetup()
namedEntityListResponse := &admin.NamedEntityList{}
adminClient.OnListNamedEntitiesMatch(mock.Anything, mock.Anything).Return(namedEntityListResponse, nil)
_, err := adminFetcherExt.FetchAllWorkflows(ctx, "project", "domain", workflowFilter)
assert.Equal(t, fmt.Errorf("no workflow retrieved for project project domain domain"), err)
})
}

func TestFetchAllWorkflowsError(t *testing.T) {
getWorkflowFetcherSetup()
adminClient.OnListNamedEntitiesMatch(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("failed"))
_, err := adminFetcherExt.FetchAllWorkflows(ctx, "project", "domain", workflowFilter)
assert.Equal(t, fmt.Errorf("failed"), err)
}

func TestFetchAllVerOfWorkflow(t *testing.T) {
getWorkflowFetcherSetup()
adminClient.OnListWorkflowsMatch(mock.Anything, mock.Anything).Return(workflowListResponse, nil)
Expand Down
20 changes: 20 additions & 0 deletions flytectl/pkg/filters/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strconv"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
)

func BuildResourceListRequestWithName(c Filters, project, domain, name string) (*admin.ResourceListRequest, error) {
Expand All @@ -29,6 +30,25 @@ func BuildResourceListRequestWithName(c Filters, project, domain, name string) (
return request, nil
}

func BuildNamedEntityListRequest(c Filters, project, domain string, resourceType core.ResourceType) (*admin.NamedEntityListRequest, error) {
fieldSelector, err := Transform(SplitTerms(c.FieldSelector))
if err != nil {
return nil, err
}
request := &admin.NamedEntityListRequest{
Limit: uint32(c.Limit),
Token: getToken(c),
Filters: fieldSelector,
Project: project,
Domain: domain,
ResourceType: resourceType,
}
if sort := buildSortingRequest(c); sort != nil {
request.SortBy = sort
}
return request, nil
}

func BuildProjectListRequest(c Filters) (*admin.ProjectListRequest, error) {
fieldSelector, err := Transform(SplitTerms(c.FieldSelector))
if err != nil {
Expand Down

0 comments on commit a4702fb

Please sign in to comment.