From d6351166365f3e97b8312598cca22ca500655c63 Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Fri, 8 Apr 2022 23:05:57 +0530 Subject: [PATCH] Filtering on activation state of workflows (#305) * Added named entity fetch on workflow which also support state filter Signed-off-by: Prafulla Mahindrakar * Fixed docs Signed-off-by: Prafulla Mahindrakar --- cmd/get/workflow.go | 27 +++++++++---- cmd/get/workflow_test.go | 37 ++++++++++------- pkg/ext/fetcher.go | 3 ++ pkg/ext/mocks/admin_fetcher_ext_interface.go | 41 +++++++++++++++++++ pkg/ext/workflow_fetcher.go | 16 ++++++++ pkg/ext/workflow_fetcher_test.go | 42 ++++++++++++++++++-- pkg/filters/util.go | 20 ++++++++++ 7 files changed, 162 insertions(+), 24 deletions(-) diff --git a/cmd/get/workflow.go b/cmd/get/workflow.go index f2b3650250..c428c2d6a7 100644 --- a/cmd/get/workflow.go +++ b/cmd/get/workflow.go @@ -23,7 +23,7 @@ Retrieve all the workflows within project and domain (workflow/workflows can be flytectl get workflow -p flytesnacks -d development -Retrieve workflow by name within project and domain: +Retrieve all versions of a workflow by name within project and domain: :: @@ -103,6 +103,14 @@ var listWorkflowColumns = []printer.Column{ {Header: "Created At", JSONPath: "$.closure.createdAt"}, } +var namedEntityColumns = []printer.Column{ + {Header: "Project", JSONPath: "$.id.project"}, + {Header: "Domain", JSONPath: "$.id.domain"}, + {Header: "Name", JSONPath: "$.id.name"}, + {Header: "Description", JSONPath: "$.metadata.description"}, + {Header: "State", JSONPath: "$.metadata.state"}, +} + func WorkflowToProtoMessages(l []*admin.Workflow) []proto.Message { messages := make([]proto.Message, 0, len(l)) for _, m := range l { @@ -111,6 +119,14 @@ func WorkflowToProtoMessages(l []*admin.Workflow) []proto.Message { return messages } +func NamedEntityToProtoMessages(l []*admin.NamedEntity) []proto.Message { + messages := make([]proto.Message, 0, len(l)) + for _, m := range l { + messages = append(messages, m) + } + return messages +} + func WorkflowToTableProtoMessages(l []*admin.Workflow) []proto.Message { messages := make([]proto.Message, 0, len(l)) for _, m := range l { @@ -155,16 +171,13 @@ func getWorkflowFunc(ctx context.Context, args []string, cmdCtx cmdCore.CommandC return adminPrinter.Print(config.GetConfig().MustOutputFormat(), columns, WorkflowToProtoMessages(workflows)...) } - workflows, err = cmdCtx.AdminFetcherExt().FetchAllVerOfWorkflow(ctx, "", config.GetConfig().Project, config.GetConfig().Domain, workflowconfig.DefaultConfig.Filter) + nameEntities, err := cmdCtx.AdminFetcherExt().FetchAllWorkflows(ctx, config.GetConfig().Project, config.GetConfig().Domain, workflowconfig.DefaultConfig.Filter) if err != nil { return err } - logger.Debugf(ctx, "Retrieved %v workflows", len(workflows)) - if config.GetConfig().MustOutputFormat() == printer.OutputFormatTABLE { - return adminPrinter.Print(config.GetConfig().MustOutputFormat(), listWorkflowColumns, WorkflowToTableProtoMessages(workflows)...) - } - return adminPrinter.Print(config.GetConfig().MustOutputFormat(), listWorkflowColumns, WorkflowToProtoMessages(workflows)...) + logger.Debugf(ctx, "Retrieved %v workflows", len(nameEntities)) + return adminPrinter.Print(config.GetConfig().MustOutputFormat(), namedEntityColumns, NamedEntityToProtoMessages(nameEntities)...) } // FetchWorkflowForName fetches the workflow give it name. diff --git a/cmd/get/workflow_test.go b/cmd/get/workflow_test.go index d0b50007ca..c622d8e3fc 100644 --- a/cmd/get/workflow_test.go +++ b/cmd/get/workflow_test.go @@ -22,20 +22,12 @@ import ( ) var ( - resourceListRequestWorkflow *admin.ResourceListRequest - workflowListResponse *admin.WorkflowList - argsWf []string - workflow1 *admin.Workflow - workflows []*admin.Workflow + argsWf []string + workflow1 *admin.Workflow + workflows []*admin.Workflow ) func getWorkflowSetup() { - resourceListRequestWorkflow = &admin.ResourceListRequest{ - Id: &admin.NamedEntityIdentifier{ - Project: projectValue, - Domain: domainValue, - }, - } variableMap := map[string]*core.Variable{ "var1": { @@ -97,9 +89,6 @@ func getWorkflowSetup() { }, } workflows = []*admin.Workflow{workflow1, workflow2} - workflowListResponse = &admin.WorkflowList{ - Workflows: workflows, - } argsWf = []string{"workflow1"} workflow.DefaultConfig.Latest = false workflow.DefaultConfig.Version = "" @@ -149,6 +138,26 @@ func TestGetWorkflowFuncWithError(t *testing.T) { assert.NotNil(t, err) }) + t.Run("fetching all workflow success", func(t *testing.T) { + s := setup() + getWorkflowSetup() + var args []string + s.FetcherExt.OnFetchAllWorkflowsMatch(mock.Anything, mock.Anything, + mock.Anything, mock.Anything).Return([]*admin.NamedEntity{}, nil) + err := getWorkflowFunc(s.Ctx, args, s.CmdCtx) + assert.Nil(t, err) + }) + + t.Run("fetching all workflow error", func(t *testing.T) { + s := setup() + getWorkflowSetup() + var args []string + s.FetcherExt.OnFetchAllWorkflowsMatch(mock.Anything, mock.Anything, + mock.Anything, mock.Anything).Return(nil, fmt.Errorf("error fetching all workflows")) + err := getWorkflowFunc(s.Ctx, args, s.CmdCtx) + assert.NotNil(t, err) + }) + } func TestGetWorkflowFuncLatestWithTable(t *testing.T) { diff --git a/pkg/ext/fetcher.go b/pkg/ext/fetcher.go index 2b8b96f793..2c40e388f6 100644 --- a/pkg/ext/fetcher.go +++ b/pkg/ext/fetcher.go @@ -49,6 +49,9 @@ type AdminFetcherExtInterface interface { // FetchTaskVersion fetches particular version of task in a project, domain FetchTaskVersion(ctx context.Context, name, version, project, domain string) (*admin.Task, error) + // FetchAllWorkflows fetches all workflows in project domain + FetchAllWorkflows(ctx context.Context, project, domain string, filter filters.Filters) ([]*admin.NamedEntity, error) + // FetchAllVerOfWorkflow fetches all versions of task in a project, domain FetchAllVerOfWorkflow(ctx context.Context, name, project, domain string, filter filters.Filters) ([]*admin.Workflow, error) diff --git a/pkg/ext/mocks/admin_fetcher_ext_interface.go b/pkg/ext/mocks/admin_fetcher_ext_interface.go index 623199b9f9..18906ce863 100644 --- a/pkg/ext/mocks/admin_fetcher_ext_interface.go +++ b/pkg/ext/mocks/admin_fetcher_ext_interface.go @@ -176,6 +176,47 @@ func (_m *AdminFetcherExtInterface) FetchAllVerOfWorkflow(ctx context.Context, n return r0, r1 } +type AdminFetcherExtInterface_FetchAllWorkflows struct { + *mock.Call +} + +func (_m AdminFetcherExtInterface_FetchAllWorkflows) Return(_a0 []*admin.NamedEntity, _a1 error) *AdminFetcherExtInterface_FetchAllWorkflows { + return &AdminFetcherExtInterface_FetchAllWorkflows{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *AdminFetcherExtInterface) OnFetchAllWorkflows(ctx context.Context, project string, domain string, filter filters.Filters) *AdminFetcherExtInterface_FetchAllWorkflows { + c := _m.On("FetchAllWorkflows", ctx, project, domain, filter) + return &AdminFetcherExtInterface_FetchAllWorkflows{Call: c} +} + +func (_m *AdminFetcherExtInterface) OnFetchAllWorkflowsMatch(matchers ...interface{}) *AdminFetcherExtInterface_FetchAllWorkflows { + c := _m.On("FetchAllWorkflows", matchers...) + return &AdminFetcherExtInterface_FetchAllWorkflows{Call: c} +} + +// FetchAllWorkflows provides a mock function with given fields: ctx, project, domain, filter +func (_m *AdminFetcherExtInterface) FetchAllWorkflows(ctx context.Context, project string, domain string, filter filters.Filters) ([]*admin.NamedEntity, error) { + ret := _m.Called(ctx, project, domain, filter) + + var r0 []*admin.NamedEntity + if rf, ok := ret.Get(0).(func(context.Context, string, string, filters.Filters) []*admin.NamedEntity); ok { + r0 = rf(ctx, project, domain, filter) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*admin.NamedEntity) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string, filters.Filters) error); ok { + r1 = rf(ctx, project, domain, filter) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + type AdminFetcherExtInterface_FetchExecution struct { *mock.Call } diff --git a/pkg/ext/workflow_fetcher.go b/pkg/ext/workflow_fetcher.go index 734298a5d4..98438e06f8 100644 --- a/pkg/ext/workflow_fetcher.go +++ b/pkg/ext/workflow_fetcher.go @@ -26,6 +26,22 @@ func (a *AdminFetcherExtClient) FetchAllVerOfWorkflow(ctx context.Context, workf return wList.Workflows, nil } +// FetchAllWorkflows fetches all workflows in project domain +func (a *AdminFetcherExtClient) FetchAllWorkflows(ctx context.Context, project, domain string, filter filters.Filters) ([]*admin.NamedEntity, error) { + tranformFilters, err := filters.BuildNamedEntityListRequest(filter, project, domain, core.ResourceType_WORKFLOW) + if err != nil { + return nil, err + } + wList, err := a.AdminServiceClient().ListNamedEntities(ctx, tranformFilters) + if err != nil { + return nil, err + } + if len(wList.Entities) == 0 { + return nil, fmt.Errorf("no workflow retrieved for %v project %v domain", project, domain) + } + return wList.Entities, nil +} + // FetchWorkflowLatestVersion fetches latest version for given workflow name func (a *AdminFetcherExtClient) FetchWorkflowLatestVersion(ctx context.Context, name, project, domain string, filter filters.Filters) (*admin.Workflow, error) { // Fetch the latest version of the workflow. diff --git a/pkg/ext/workflow_fetcher_test.go b/pkg/ext/workflow_fetcher_test.go index 9ae96612a6..da027566e2 100644 --- a/pkg/ext/workflow_fetcher_test.go +++ b/pkg/ext/workflow_fetcher_test.go @@ -17,9 +17,10 @@ import ( ) var ( - workflowListResponse *admin.WorkflowList - workflowFilter = filters.Filters{} - workflowResponse *admin.Workflow + workflowListResponse *admin.WorkflowList + namedEntityListResponse *admin.NamedEntityList + workflowFilter = filters.Filters{} + workflowResponse *admin.Workflow ) func getWorkflowFetcherSetup() { @@ -79,14 +80,49 @@ func getWorkflowFetcherSetup() { }, } + namedEntity := &admin.NamedEntity{ + Id: &admin.NamedEntityIdentifier{ + Project: "project", + Domain: "domain", + Name: "workflow", + }, + ResourceType: core.ResourceType_WORKFLOW, + } + workflows := []*admin.Workflow{workflow2, workflow1} + namedEntityListResponse = &admin.NamedEntityList{ + Entities: []*admin.NamedEntity{namedEntity}, + } workflowListResponse = &admin.WorkflowList{ Workflows: workflows, } workflowResponse = workflows[0] } +func TestFetchAllWorkflows(t *testing.T) { + t.Run("non empty response", func(t *testing.T) { + getWorkflowFetcherSetup() + adminClient.OnListNamedEntitiesMatch(mock.Anything, mock.Anything).Return(namedEntityListResponse, nil) + _, err := adminFetcherExt.FetchAllWorkflows(ctx, "project", "domain", workflowFilter) + assert.Nil(t, err) + }) + t.Run("empty response", func(t *testing.T) { + getWorkflowFetcherSetup() + namedEntityListResponse := &admin.NamedEntityList{} + adminClient.OnListNamedEntitiesMatch(mock.Anything, mock.Anything).Return(namedEntityListResponse, nil) + _, err := adminFetcherExt.FetchAllWorkflows(ctx, "project", "domain", workflowFilter) + assert.Equal(t, fmt.Errorf("no workflow retrieved for project project domain domain"), err) + }) +} + +func TestFetchAllWorkflowsError(t *testing.T) { + getWorkflowFetcherSetup() + adminClient.OnListNamedEntitiesMatch(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("failed")) + _, err := adminFetcherExt.FetchAllWorkflows(ctx, "project", "domain", workflowFilter) + assert.Equal(t, fmt.Errorf("failed"), err) +} + func TestFetchAllVerOfWorkflow(t *testing.T) { getWorkflowFetcherSetup() adminClient.OnListWorkflowsMatch(mock.Anything, mock.Anything).Return(workflowListResponse, nil) diff --git a/pkg/filters/util.go b/pkg/filters/util.go index 6d2501e4d5..b8584a7a57 100644 --- a/pkg/filters/util.go +++ b/pkg/filters/util.go @@ -4,6 +4,7 @@ import ( "strconv" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" ) func BuildResourceListRequestWithName(c Filters, project, domain, name string) (*admin.ResourceListRequest, error) { @@ -29,6 +30,25 @@ func BuildResourceListRequestWithName(c Filters, project, domain, name string) ( return request, nil } +func BuildNamedEntityListRequest(c Filters, project, domain string, resourceType core.ResourceType) (*admin.NamedEntityListRequest, error) { + fieldSelector, err := Transform(SplitTerms(c.FieldSelector)) + if err != nil { + return nil, err + } + request := &admin.NamedEntityListRequest{ + Limit: uint32(c.Limit), + Token: getToken(c), + Filters: fieldSelector, + Project: project, + Domain: domain, + ResourceType: resourceType, + } + if sort := buildSortingRequest(c); sort != nil { + request.SortBy = sort + } + return request, nil +} + func BuildProjectListRequest(c Filters) (*admin.ProjectListRequest, error) { fieldSelector, err := Transform(SplitTerms(c.FieldSelector)) if err != nil {