diff --git a/flyteadmin/cmd/entrypoints/clusterresource.go b/flyteadmin/cmd/entrypoints/clusterresource.go index 8bf7a76dff..8ea4cada30 100644 --- a/flyteadmin/cmd/entrypoints/clusterresource.go +++ b/flyteadmin/cmd/entrypoints/clusterresource.go @@ -3,7 +3,12 @@ package entrypoints import ( "context" - "github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces" + "github.com/flyteorg/flyteadmin/pkg/clusterresource/impl" + "github.com/flyteorg/flyteadmin/pkg/clusterresource/interfaces" + execClusterIfaces "github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces" + "github.com/flyteorg/flyteadmin/pkg/manager/impl/resources" + "github.com/flyteorg/flyteadmin/pkg/repositories" + repositoryConfig "github.com/flyteorg/flyteadmin/pkg/repositories/config" "github.com/flyteorg/flytestdlib/promutils" @@ -29,7 +34,7 @@ func getClusterResourceController(ctx context.Context, scope promutils.Scope, co initializationErrorCounter := scope.MustNewCounter( "flyteclient_initialization_error", "count of errors encountered initializing a flyte client from kube config") - var listTargetsProvider interfaces.ListTargetsInterface + var listTargetsProvider execClusterIfaces.ListTargetsInterface var err error if len(configuration.ClusterConfiguration().GetClusterConfigs()) == 0 { serverConfig := config.GetConfig() @@ -41,12 +46,22 @@ func getClusterResourceController(ctx context.Context, scope promutils.Scope, co panic(err) } - clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).Build(ctx) - if err != nil { - panic(err) + var adminDataProvider interfaces.FlyteAdminDataProvider + if configuration.ClusterResourceConfiguration().IsStandaloneDeployment() { + clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).Build(ctx) + if err != nil { + panic(err) + } + adminDataProvider = impl.NewAdminServiceDataProvider(clientSet.AdminClient()) + } else { + dbConfig := repositoryConfig.NewDbConfig(configuration.ApplicationConfiguration().GetDbConfig()) + db := repositories.GetRepository( + repositories.POSTGRES, dbConfig, scope.NewSubScope("database")) + + adminDataProvider = impl.NewDatabaseAdminDataProvider(db, configuration, resources.NewResourceManager(db, configuration.ApplicationConfiguration())) } - return clusterresource.NewClusterResourceController(clientSet.AdminClient(), listTargetsProvider, scope) + return clusterresource.NewClusterResourceController(adminDataProvider, listTargetsProvider, scope) } var controllerRunCmd = &cobra.Command{ diff --git a/flyteadmin/pkg/clusterresource/controller.go b/flyteadmin/pkg/clusterresource/controller.go index 25054a3d3a..bcd2736968 100644 --- a/flyteadmin/pkg/clusterresource/controller.go +++ b/flyteadmin/pkg/clusterresource/controller.go @@ -14,10 +14,9 @@ import ( "google.golang.org/grpc/status" - "github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" - + "github.com/flyteorg/flyteadmin/pkg/clusterresource/interfaces" "github.com/flyteorg/flyteadmin/pkg/executioncluster" + executionclusterIfaces "github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces" "github.com/flyteorg/flyteadmin/pkg/runtime" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -96,14 +95,9 @@ type controller struct { metrics controllerMetrics lastAppliedTemplateDir string // Map of [namespace -> [templateFileName -> last modified time]] - appliedTemplates NamespaceCache - adminClient service.AdminServiceClient - listTargets interfaces.ListTargetsInterface -} - -var descCreatedAtSortParam = &admin.Sort{ - Direction: admin.Sort_DESCENDING, - Key: "created_at", + appliedTemplates NamespaceCache + adminDataProvider interfaces.FlyteAdminDataProvider + listTargets executionclusterIfaces.ListTargetsInterface } func (c *controller) templateAlreadyApplied(namespace NamespaceName, templateFile os.FileInfo) bool { @@ -199,23 +193,19 @@ func (c *controller) getCustomTemplateValues( } collectedErrs := make([]error, 0) // All override values saved in the database take precedence over the domain-specific defaults. - resource, err := c.adminClient.GetProjectDomainAttributes(ctx, &admin.ProjectDomainAttributesGetRequest{ - Project: project, - Domain: domain, - ResourceType: admin.MatchableResource_CLUSTER_RESOURCE, - }) + attributes, err := c.adminDataProvider.GetClusterResourceAttributes(ctx, project, domain) if err != nil { s, ok := status.FromError(err) if !ok || s.Code() != codes.NotFound { collectedErrs = append(collectedErrs, err) } } - if resource != nil && resource.Attributes != nil && resource.Attributes.MatchingAttributes != nil && - resource.Attributes.MatchingAttributes.GetClusterResourceAttributes() != nil { - for templateKey, templateValue := range resource.Attributes.MatchingAttributes.GetClusterResourceAttributes().Attributes { + if attributes != nil && attributes.Attributes != nil { + for templateKey, templateValue := range attributes.Attributes { customTemplateValues[fmt.Sprintf(templateVariableFormat, templateKey)] = templateValue } } + if len(collectedErrs) > 0 { return nil, errors.NewCollectedFlyteAdminError(codes.InvalidArgument, collectedErrs) } @@ -550,32 +540,6 @@ func (c *controller) createPatch(gvk schema.GroupVersionKind, currentObj *unstru return patch, patchType, nil } -var activeProjectsFilter = fmt.Sprintf("ne(state,%d)", admin.Project_ARCHIVED) - -func (c *controller) listAllProjects(ctx context.Context) ([]*admin.Project, error) { - projects := make([]*admin.Project, 0) - listReq := &admin.ProjectListRequest{ - Limit: 100, - Filters: activeProjectsFilter, - // Prefer to sync projects most newly created to ensure their resources get created first when other resources exist. - SortBy: descCreatedAtSortParam, - } - - // Iterate through all pages of projects - for { - projectResp, err := c.adminClient.ListProjects(ctx, listReq) - if err != nil { - return nil, err - } - projects = append(projects, projectResp.Projects...) - if len(projectResp.Token) == 0 { - break - } - listReq.Token = projectResp.Token - } - return projects, nil -} - func (c *controller) Sync(ctx context.Context) error { defer func() { if err := recover(); err != nil { @@ -586,7 +550,7 @@ func (c *controller) Sync(ctx context.Context) error { c.metrics.SyncStarted.Inc() logger.Debugf(ctx, "Running an invocation of ClusterResource Sync") - projects, err := c.listAllProjects(ctx) + projects, err := c.adminDataProvider.GetProjects(ctx) if err != nil { return err } @@ -603,7 +567,7 @@ func (c *controller) Sync(ctx context.Context) error { errs = append(errs, err) } - for _, project := range projects { + for _, project := range projects.Projects { for _, domain := range *domains { namespace := common.GetNamespaceName(c.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), project.Id, domain.Name) customTemplateValues, err := c.getCustomTemplateValues( @@ -669,14 +633,14 @@ func newMetrics(scope promutils.Scope) controllerMetrics { } } -func NewClusterResourceController(adminClient service.AdminServiceClient, listTargets interfaces.ListTargetsInterface, scope promutils.Scope) Controller { +func NewClusterResourceController(adminDataProvider interfaces.FlyteAdminDataProvider, listTargets executionclusterIfaces.ListTargetsInterface, scope promutils.Scope) Controller { config := runtime.NewConfigurationProvider() return &controller{ - adminClient: adminClient, - config: config, - listTargets: listTargets, - poller: make(chan struct{}), - metrics: newMetrics(scope), - appliedTemplates: make(map[string]map[string]time.Time), + adminDataProvider: adminDataProvider, + config: config, + listTargets: listTargets, + poller: make(chan struct{}), + metrics: newMetrics(scope), + appliedTemplates: make(map[string]map[string]time.Time), } } diff --git a/flyteadmin/pkg/clusterresource/controller_test.go b/flyteadmin/pkg/clusterresource/controller_test.go index 47a5a525e5..7d6b1008fe 100644 --- a/flyteadmin/pkg/clusterresource/controller_test.go +++ b/flyteadmin/pkg/clusterresource/controller_test.go @@ -7,14 +7,16 @@ import ( "testing" "time" - "github.com/stretchr/testify/mock" + "github.com/flyteorg/flyteadmin/pkg/errors" + "google.golang.org/grpc/codes" - "github.com/flyteorg/flyteadmin/pkg/executioncluster/mocks" + "github.com/flyteorg/flyteadmin/pkg/clusterresource/mocks" + execClusterMocks "github.com/flyteorg/flyteadmin/pkg/executioncluster/mocks" runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" - clientMocks "github.com/flyteorg/flyteidl/clients/go/admin/mocks" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" mockScope "github.com/flyteorg/flytestdlib/promutils" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -152,28 +154,16 @@ func TestPopulateDefaultTemplateValues(t *testing.T) { } func TestGetCustomTemplateValues(t *testing.T) { - adminClient := clientMocks.AdminServiceClient{} - adminClient.OnGetProjectDomainAttributesMatch(mock.Anything, mock.MatchedBy(func(req *admin.ProjectDomainAttributesGetRequest) bool { - return req.Project == proj && req.Domain == domain - })).Return(&admin.ProjectDomainAttributesGetResponse{ - Attributes: &admin.ProjectDomainAttributes{ - Project: proj, - Domain: domain, - MatchingAttributes: &admin.MatchingAttributes{ - Target: &admin.MatchingAttributes_ClusterResourceAttributes{ - ClusterResourceAttributes: &admin.ClusterResourceAttributes{ - Attributes: map[string]string{ - "var1": "val1", - "var2": "val2", - }, - }, - }, - }, + adminDataProvider := mocks.FlyteAdminDataProvider{} + adminDataProvider.OnGetClusterResourceAttributesMatch(mock.Anything, proj, domain).Return(&admin.ClusterResourceAttributes{ + Attributes: map[string]string{ + "var1": "val1", + "var2": "val2", }, }, nil) testController := controller{ - adminClient: &adminClient, + adminDataProvider: &adminDataProvider, } domainTemplateValues := templateValuesType{ "{{ var1 }}": "i'm getting overwritten", @@ -191,12 +181,11 @@ func TestGetCustomTemplateValues(t *testing.T) { } func TestGetCustomTemplateValues_NothingToOverride(t *testing.T) { - adminClient := clientMocks.AdminServiceClient{} - adminClient.OnGetProjectDomainAttributesMatch(mock.Anything, mock.MatchedBy(func(req *admin.ProjectDomainAttributesGetRequest) bool { - return req.Project == proj && req.Domain == domain - })).Return(&admin.ProjectDomainAttributesGetResponse{}, nil) + adminDataProvider := mocks.FlyteAdminDataProvider{} + adminDataProvider.OnGetClusterResourceAttributesMatch(mock.Anything, proj, domain).Return( + nil, errors.NewFlyteAdminError(codes.NotFound, "foo")) testController := controller{ - adminClient: &adminClient, + adminDataProvider: &adminDataProvider, } customTemplateValues, err := testController.getCustomTemplateValues(context.Background(), proj, domain, templateValuesType{ "{{ var1 }}": "val1", @@ -344,11 +333,11 @@ metadata: } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - adminClient := clientMocks.AdminServiceClient{} - adminClient.OnGetProjectDomainAttributesMatch(mock.Anything, mock.Anything).Return(&admin.ProjectDomainAttributesGetResponse{}, nil) + adminDataProvider := mocks.FlyteAdminDataProvider{} + adminDataProvider.OnGetClusterResourceAttributesMatch(mock.Anything, mock.Anything, mock.Anything).Return(&admin.ClusterResourceAttributes{}, nil) mockPromScope := mockScope.NewTestScope() - c := NewClusterResourceController(&adminClient, &mocks.ListTargetsInterface{}, mockPromScope) + c := NewClusterResourceController(&adminDataProvider, &execClusterMocks.ListTargetsInterface{}, mockPromScope) testController := c.(*controller) gotK8sManifest, err := testController.createResourceFromTemplate(tt.args.ctx, tt.args.templateDir, tt.args.templateFileName, tt.args.project, tt.args.domain, tt.args.namespace, tt.args.templateValues, tt.args.customTemplateValues) diff --git a/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider.go b/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider.go new file mode 100644 index 0000000000..41d92b29a3 --- /dev/null +++ b/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider.go @@ -0,0 +1,66 @@ +package impl + +import ( + "context" + "fmt" + + "github.com/flyteorg/flyteadmin/pkg/clusterresource/interfaces" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" +) + +// Implementation of an interfaces.FlyteAdminDataProvider which fetches data using a flyteadmin service client +type serviceAdminProvider struct { + adminClient service.AdminServiceClient +} + +func (p serviceAdminProvider) GetClusterResourceAttributes(ctx context.Context, project, domain string) (*admin.ClusterResourceAttributes, error) { + resource, err := p.adminClient.GetProjectDomainAttributes(ctx, &admin.ProjectDomainAttributesGetRequest{ + Project: project, + Domain: domain, + ResourceType: admin.MatchableResource_CLUSTER_RESOURCE, + }) + if err != nil { + return nil, err + } + if resource != nil && resource.Attributes != nil && resource.Attributes.MatchingAttributes != nil && + resource.Attributes.MatchingAttributes.GetClusterResourceAttributes() != nil { + return resource.Attributes.MatchingAttributes.GetClusterResourceAttributes(), nil + } + return nil, NewMissingEntityError("cluster resource attributes") +} + +var activeProjectsFilter = fmt.Sprintf("ne(state,%d)", admin.Project_ARCHIVED) + +func (p serviceAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) { + projects := make([]*admin.Project, 0) + listReq := &admin.ProjectListRequest{ + Limit: 100, + Filters: activeProjectsFilter, + // Prefer to sync projects most newly created to ensure their resources get created first when other resources exist. + SortBy: &descCreatedAtSortParam, + } + + // Iterate through all pages of projects + for { + projectResp, err := p.adminClient.ListProjects(ctx, listReq) + if err != nil { + return nil, err + } + projects = append(projects, projectResp.Projects...) + if len(projectResp.Token) == 0 { + break + } + listReq.Token = projectResp.Token + } + return &admin.Projects{ + Projects: projects, + }, nil +} + +func NewAdminServiceDataProvider( + adminClient service.AdminServiceClient) interfaces.FlyteAdminDataProvider { + return &serviceAdminProvider{ + adminClient: adminClient, + } +} diff --git a/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider_test.go b/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider_test.go new file mode 100644 index 0000000000..79d980b7e7 --- /dev/null +++ b/flyteadmin/pkg/clusterresource/impl/admin_service_data_provider_test.go @@ -0,0 +1,119 @@ +package impl + +import ( + "context" + "testing" + + "github.com/flyteorg/flyteidl/clients/go/admin/mocks" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestServiceGetClusterResourceAttributes(t *testing.T) { + ctx := context.TODO() + project := "flytesnacks" + domain := "development" + t.Run("happy case", func(t *testing.T) { + var attributes = map[string]string{ + "K1": "V1", + "K2": "V2", + } + mockAdmin := mocks.AdminServiceClient{} + mockAdmin.OnGetProjectDomainAttributesMatch(ctx, mock.MatchedBy(func(req *admin.ProjectDomainAttributesGetRequest) bool { + return req.Project == project && req.Domain == domain && req.ResourceType == admin.MatchableResource_CLUSTER_RESOURCE + })).Return(&admin.ProjectDomainAttributesGetResponse{ + Attributes: &admin.ProjectDomainAttributes{ + MatchingAttributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_ClusterResourceAttributes{ + ClusterResourceAttributes: &admin.ClusterResourceAttributes{ + Attributes: attributes, + }, + }, + }, + }, + }, nil) + + provider := serviceAdminProvider{ + adminClient: &mockAdmin, + } + attrs, err := provider.GetClusterResourceAttributes(context.TODO(), project, domain) + assert.NoError(t, err) + assert.EqualValues(t, attrs.Attributes, attributes) + }) + t.Run("admin service error", func(t *testing.T) { + mockAdmin := mocks.AdminServiceClient{} + mockAdmin.OnGetProjectDomainAttributesMatch(ctx, mock.MatchedBy(func(req *admin.ProjectDomainAttributesGetRequest) bool { + return req.Project == project && req.Domain == domain && req.ResourceType == admin.MatchableResource_CLUSTER_RESOURCE + })).Return(&admin.ProjectDomainAttributesGetResponse{}, errFoo) + + provider := serviceAdminProvider{ + adminClient: &mockAdmin, + } + _, err := provider.GetClusterResourceAttributes(context.TODO(), project, domain) + assert.EqualError(t, err, errFoo.Error()) + }) + t.Run("wonky admin service response", func(t *testing.T) { + mockAdmin := mocks.AdminServiceClient{} + mockAdmin.OnGetProjectDomainAttributesMatch(ctx, mock.MatchedBy(func(req *admin.ProjectDomainAttributesGetRequest) bool { + return req.Project == project && req.Domain == domain && req.ResourceType == admin.MatchableResource_CLUSTER_RESOURCE + })).Return(&admin.ProjectDomainAttributesGetResponse{ + Attributes: &admin.ProjectDomainAttributes{ + MatchingAttributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_ExecutionQueueAttributes{ + ExecutionQueueAttributes: &admin.ExecutionQueueAttributes{ + Tags: []string{"foo", "bar", "baz"}, + }, + }, + }, + }, + }, nil) + + provider := serviceAdminProvider{ + adminClient: &mockAdmin, + } + attrs, err := provider.GetClusterResourceAttributes(context.TODO(), project, domain) + assert.Nil(t, attrs) + s, ok := status.FromError(err) + assert.True(t, ok) + assert.Equal(t, s.Code(), codes.NotFound) + }) +} + +func TestServiceGetProjects(t *testing.T) { + ctx := context.TODO() + t.Run("happy case", func(t *testing.T) { + mockAdmin := mocks.AdminServiceClient{} + mockAdmin.OnListProjectsMatch(ctx, mock.MatchedBy(func(req *admin.ProjectListRequest) bool { + return req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "created_at" + })).Return(&admin.Projects{ + Projects: []*admin.Project{ + { + Id: "flytesnacks", + }, + { + Id: "flyteexamples", + }, + }, + }, nil) + provider := serviceAdminProvider{ + adminClient: &mockAdmin, + } + projects, err := provider.GetProjects(ctx) + assert.NoError(t, err) + assert.Len(t, projects.Projects, 2) + }) + t.Run("admin error", func(t *testing.T) { + mockAdmin := mocks.AdminServiceClient{} + mockAdmin.OnListProjectsMatch(ctx, mock.MatchedBy(func(req *admin.ProjectListRequest) bool { + return req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "created_at" + })).Return(nil, errFoo) + provider := serviceAdminProvider{ + adminClient: &mockAdmin, + } + _, err := provider.GetProjects(ctx) + assert.EqualError(t, err, errFoo.Error()) + }) +} diff --git a/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider.go b/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider.go new file mode 100644 index 0000000000..0b1063325c --- /dev/null +++ b/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider.go @@ -0,0 +1,74 @@ +package impl + +import ( + "context" + + "github.com/flyteorg/flyteadmin/pkg/clusterresource/interfaces" + "github.com/flyteorg/flyteadmin/pkg/common" + managerInterfaces "github.com/flyteorg/flyteadmin/pkg/manager/interfaces" + "github.com/flyteorg/flyteadmin/pkg/repositories" + repositoriesInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces" + "github.com/flyteorg/flyteadmin/pkg/repositories/transformers" + runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" +) + +// Implementation of an interfaces.FlyteAdminDataProvider which fetches data directly from the provided database connection. +type dbAdminProvider struct { + db repositories.RepositoryInterface + config runtimeInterfaces.Configuration + resourceManager managerInterfaces.ResourceInterface +} + +func (p dbAdminProvider) GetClusterResourceAttributes(ctx context.Context, project, domain string) (*admin.ClusterResourceAttributes, error) { + resource, err := p.resourceManager.GetResource(ctx, managerInterfaces.ResourceRequest{ + Project: project, + Domain: domain, + ResourceType: admin.MatchableResource_CLUSTER_RESOURCE, + }) + if err != nil { + return nil, err + } + if resource != nil && resource.Attributes != nil && resource.Attributes.GetClusterResourceAttributes() != nil { + return resource.Attributes.GetClusterResourceAttributes(), nil + } + return nil, NewMissingEntityError("cluster resource attributes") +} + +func (p dbAdminProvider) getDomains() []*admin.Domain { + configDomains := p.config.ApplicationConfiguration().GetDomainsConfig() + var domains = make([]*admin.Domain, len(*configDomains)) + for index, configDomain := range *configDomains { + domains[index] = &admin.Domain{ + Id: configDomain.ID, + Name: configDomain.Name, + } + } + return domains +} + +func (p dbAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) { + filter, err := common.NewSingleValueFilter(common.Project, common.NotEqual, "state", int32(admin.Project_ARCHIVED)) + if err != nil { + return nil, err + } + projectModels, err := p.db.ProjectRepo().List(ctx, repositoriesInterfaces.ListResourceInput{ + SortParameter: descCreatedAtSortDBParam, + InlineFilters: []common.InlineFilter{filter}, + }) + if err != nil { + return nil, err + } + projects := transformers.FromProjectModels(projectModels, p.getDomains()) + return &admin.Projects{ + Projects: projects, + }, nil +} + +func NewDatabaseAdminDataProvider(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, resourceManager managerInterfaces.ResourceInterface) interfaces.FlyteAdminDataProvider { + return &dbAdminProvider{ + db: db, + config: config, + resourceManager: resourceManager, + } +} diff --git a/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider_test.go b/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider_test.go new file mode 100644 index 0000000000..452a559950 --- /dev/null +++ b/flyteadmin/pkg/clusterresource/impl/db_admin_data_provider_test.go @@ -0,0 +1,143 @@ +package impl + +import ( + "context" + "errors" + "testing" + + "github.com/flyteorg/flyteadmin/pkg/manager/interfaces" + "github.com/flyteorg/flyteadmin/pkg/manager/mocks" + repoInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces" + repoMocks "github.com/flyteorg/flyteadmin/pkg/repositories/mocks" + "github.com/flyteorg/flyteadmin/pkg/repositories/models" + runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" + configMocks "github.com/flyteorg/flyteadmin/pkg/runtime/mocks" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var errFoo = errors.New("foo") + +func TestGetClusterResourceAttributes(t *testing.T) { + project := "flytesnacks" + domain := "development" + var attributes = map[string]string{ + "K1": "V1", + "K2": "V2", + } + resourceManager := mocks.MockResourceManager{} + t.Run("happy case", func(t *testing.T) { + resourceManager.GetResourceFunc = func(ctx context.Context, request interfaces.ResourceRequest) (*interfaces.ResourceResponse, error) { + return &interfaces.ResourceResponse{ + Project: request.Project, + Domain: request.Domain, + ResourceType: admin.MatchableResource_CLUSTER_RESOURCE.String(), + Attributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_ClusterResourceAttributes{ + ClusterResourceAttributes: &admin.ClusterResourceAttributes{ + Attributes: attributes, + }, + }, + }, + }, nil + } + provider := dbAdminProvider{ + resourceManager: &resourceManager, + } + attrs, err := provider.GetClusterResourceAttributes(context.TODO(), project, domain) + assert.NoError(t, err) + assert.EqualValues(t, attrs.Attributes, attributes) + }) + t.Run("error", func(t *testing.T) { + resourceManager.GetResourceFunc = func(ctx context.Context, request interfaces.ResourceRequest) (*interfaces.ResourceResponse, error) { + return nil, errFoo + } + provider := dbAdminProvider{ + resourceManager: &resourceManager, + } + _, err := provider.GetClusterResourceAttributes(context.TODO(), project, domain) + assert.EqualError(t, err, errFoo.Error()) + }) + t.Run("weird db response", func(t *testing.T) { + resourceManager.GetResourceFunc = func(ctx context.Context, request interfaces.ResourceRequest) (*interfaces.ResourceResponse, error) { + return &interfaces.ResourceResponse{ + Project: request.Project, + Domain: request.Domain, + ResourceType: admin.MatchableResource_EXECUTION_QUEUE.String(), + Attributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_ExecutionQueueAttributes{ + ExecutionQueueAttributes: &admin.ExecutionQueueAttributes{ + Tags: []string{"foo", "bar", "baz"}, + }, + }, + }, + }, nil + } + provider := dbAdminProvider{ + resourceManager: &resourceManager, + } + attrs, err := provider.GetClusterResourceAttributes(context.TODO(), project, domain) + assert.Nil(t, attrs) + s, ok := status.FromError(err) + assert.True(t, ok) + assert.Equal(t, s.Code(), codes.NotFound) + }) +} + +func TestGetProjects(t *testing.T) { + mockApplicationConfig := configMocks.MockApplicationProvider{} + mockApplicationConfig.SetDomainsConfig([]runtimeInterfaces.Domain{ + { + Name: "development", + }, + { + Name: "production", + }, + }) + mockConfig := configMocks.NewMockConfigurationProvider(&mockApplicationConfig, nil, nil, nil, nil, nil) + + t.Run("happy case", func(t *testing.T) { + mockRepo := repoMocks.NewMockRepository() + activeProjectState := int32(0) + mockRepo.(*repoMocks.MockRepository).ProjectRepoIface = &repoMocks.MockProjectRepo{ + ListProjectsFunction: func(ctx context.Context, input repoInterfaces.ListResourceInput) ([]models.Project, error) { + assert.Len(t, input.InlineFilters, 1) + assert.Equal(t, input.SortParameter.GetGormOrderExpr(), "created_at desc") + return []models.Project{ + { + Identifier: "flytesnacks", + State: &activeProjectState, + }, + { + Identifier: "flyteexamples", + State: &activeProjectState, + }, + }, nil + }, + } + + provider := dbAdminProvider{ + db: mockRepo, + config: mockConfig, + } + projects, err := provider.GetProjects(context.TODO()) + assert.NoError(t, err) + assert.Len(t, projects.Projects, 2) + }) + t.Run("db error", func(t *testing.T) { + mockRepo := repoMocks.NewMockRepository() + mockRepo.(*repoMocks.MockRepository).ProjectRepoIface = &repoMocks.MockProjectRepo{ + ListProjectsFunction: func(ctx context.Context, input repoInterfaces.ListResourceInput) ([]models.Project, error) { + return nil, errFoo + }, + } + provider := dbAdminProvider{ + db: mockRepo, + config: mockConfig, + } + _, err := provider.GetProjects(context.TODO()) + assert.EqualError(t, err, errFoo.Error()) + }) +} diff --git a/flyteadmin/pkg/clusterresource/impl/shared.go b/flyteadmin/pkg/clusterresource/impl/shared.go new file mode 100644 index 0000000000..6ba07856fb --- /dev/null +++ b/flyteadmin/pkg/clusterresource/impl/shared.go @@ -0,0 +1,19 @@ +package impl + +import ( + "github.com/flyteorg/flyteadmin/pkg/common" + "github.com/flyteorg/flyteadmin/pkg/errors" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "google.golang.org/grpc/codes" +) + +func NewMissingEntityError(entity string) error { + return errors.NewFlyteAdminErrorf(codes.NotFound, "Failed to find [%s]", entity) +} + +var descCreatedAtSortParam = admin.Sort{ + Direction: admin.Sort_DESCENDING, + Key: "created_at", +} + +var descCreatedAtSortDBParam, _ = common.NewSortParameter(descCreatedAtSortParam) diff --git a/flyteadmin/pkg/clusterresource/interfaces/admin.go b/flyteadmin/pkg/clusterresource/interfaces/admin.go new file mode 100644 index 0000000000..fbd142224f --- /dev/null +++ b/flyteadmin/pkg/clusterresource/interfaces/admin.go @@ -0,0 +1,14 @@ +package interfaces + +import ( + "context" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" +) + +//go:generate mockery -name FlyteAdminDataProvider -output=../mocks -case=underscore + +type FlyteAdminDataProvider interface { + GetClusterResourceAttributes(ctx context.Context, project, domain string) (*admin.ClusterResourceAttributes, error) + GetProjects(ctx context.Context) (*admin.Projects, error) +} diff --git a/flyteadmin/pkg/clusterresource/mocks/flyte_admin_data_provider.go b/flyteadmin/pkg/clusterresource/mocks/flyte_admin_data_provider.go new file mode 100644 index 0000000000..66d3835b33 --- /dev/null +++ b/flyteadmin/pkg/clusterresource/mocks/flyte_admin_data_provider.go @@ -0,0 +1,98 @@ +// Code generated by mockery v1.0.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + admin "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + + mock "github.com/stretchr/testify/mock" +) + +// FlyteAdminDataProvider is an autogenerated mock type for the FlyteAdminDataProvider type +type FlyteAdminDataProvider struct { + mock.Mock +} + +type FlyteAdminDataProvider_GetClusterResourceAttributes struct { + *mock.Call +} + +func (_m FlyteAdminDataProvider_GetClusterResourceAttributes) Return(_a0 *admin.ClusterResourceAttributes, _a1 error) *FlyteAdminDataProvider_GetClusterResourceAttributes { + return &FlyteAdminDataProvider_GetClusterResourceAttributes{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *FlyteAdminDataProvider) OnGetClusterResourceAttributes(ctx context.Context, project string, domain string) *FlyteAdminDataProvider_GetClusterResourceAttributes { + c := _m.On("GetClusterResourceAttributes", ctx, project, domain) + return &FlyteAdminDataProvider_GetClusterResourceAttributes{Call: c} +} + +func (_m *FlyteAdminDataProvider) OnGetClusterResourceAttributesMatch(matchers ...interface{}) *FlyteAdminDataProvider_GetClusterResourceAttributes { + c := _m.On("GetClusterResourceAttributes", matchers...) + return &FlyteAdminDataProvider_GetClusterResourceAttributes{Call: c} +} + +// GetClusterResourceAttributes provides a mock function with given fields: ctx, project, domain +func (_m *FlyteAdminDataProvider) GetClusterResourceAttributes(ctx context.Context, project string, domain string) (*admin.ClusterResourceAttributes, error) { + ret := _m.Called(ctx, project, domain) + + var r0 *admin.ClusterResourceAttributes + if rf, ok := ret.Get(0).(func(context.Context, string, string) *admin.ClusterResourceAttributes); ok { + r0 = rf(ctx, project, domain) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*admin.ClusterResourceAttributes) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, project, domain) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type FlyteAdminDataProvider_GetProjects struct { + *mock.Call +} + +func (_m FlyteAdminDataProvider_GetProjects) Return(_a0 *admin.Projects, _a1 error) *FlyteAdminDataProvider_GetProjects { + return &FlyteAdminDataProvider_GetProjects{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *FlyteAdminDataProvider) OnGetProjects(ctx context.Context) *FlyteAdminDataProvider_GetProjects { + c := _m.On("GetProjects", ctx) + return &FlyteAdminDataProvider_GetProjects{Call: c} +} + +func (_m *FlyteAdminDataProvider) OnGetProjectsMatch(matchers ...interface{}) *FlyteAdminDataProvider_GetProjects { + c := _m.On("GetProjects", matchers...) + return &FlyteAdminDataProvider_GetProjects{Call: c} +} + +// GetProjects provides a mock function with given fields: ctx +func (_m *FlyteAdminDataProvider) GetProjects(ctx context.Context) (*admin.Projects, error) { + ret := _m.Called(ctx) + + var r0 *admin.Projects + if rf, ok := ret.Get(0).(func(context.Context) *admin.Projects); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*admin.Projects) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/flyteadmin/pkg/repositories/mocks/repository.go b/flyteadmin/pkg/repositories/mocks/repository.go index ab0a43e292..de320d7386 100644 --- a/flyteadmin/pkg/repositories/mocks/repository.go +++ b/flyteadmin/pkg/repositories/mocks/repository.go @@ -15,7 +15,7 @@ type MockRepository struct { ExecutionEventRepoIface interfaces.ExecutionEventRepoInterface nodeExecutionRepo interfaces.NodeExecutionRepoInterface NodeExecutionEventRepoIface interfaces.NodeExecutionEventRepoInterface - projectRepo interfaces.ProjectRepoInterface + ProjectRepoIface interfaces.ProjectRepoInterface resourceRepo interfaces.ResourceRepoInterface taskExecutionRepo interfaces.TaskExecutionRepoInterface namedEntityRepo interfaces.NamedEntityRepoInterface @@ -60,7 +60,7 @@ func (r *MockRepository) NodeExecutionEventRepo() interfaces.NodeExecutionEventR } func (r *MockRepository) ProjectRepo() interfaces.ProjectRepoInterface { - return r.projectRepo + return r.ProjectRepoIface } func (r *MockRepository) ResourceRepo() interfaces.ResourceRepoInterface { @@ -82,7 +82,7 @@ func NewMockRepository() repositories.RepositoryInterface { launchPlanRepo: NewMockLaunchPlanRepo(), executionRepo: NewMockExecutionRepo(), nodeExecutionRepo: NewMockNodeExecutionRepo(), - projectRepo: NewMockProjectRepo(), + ProjectRepoIface: NewMockProjectRepo(), resourceRepo: NewMockResourceRepo(), taskExecutionRepo: NewMockTaskExecutionRepo(), namedEntityRepo: NewMockNamedEntityRepo(), diff --git a/flyteadmin/pkg/runtime/cluster_resource_provider.go b/flyteadmin/pkg/runtime/cluster_resource_provider.go index 9d5b064e8a..30812b51a4 100644 --- a/flyteadmin/pkg/runtime/cluster_resource_provider.go +++ b/flyteadmin/pkg/runtime/cluster_resource_provider.go @@ -36,6 +36,10 @@ func (p *ClusterResourceConfigurationProvider) GetCustomTemplateData() map[inter return clusterResourceConfig.GetConfig().(*interfaces.ClusterResourceConfig).CustomData } +func (p *ClusterResourceConfigurationProvider) IsStandaloneDeployment() bool { + return clusterResourceConfig.GetConfig().(*interfaces.ClusterResourceConfig).StandaloneDeployment +} + func NewClusterResourceConfigurationProvider() interfaces.ClusterResourceConfiguration { return &ClusterResourceConfigurationProvider{} } diff --git a/flyteadmin/pkg/runtime/interfaces/cluster_resource_configuration.go b/flyteadmin/pkg/runtime/interfaces/cluster_resource_configuration.go index 928caa2d71..feaa06cb3d 100644 --- a/flyteadmin/pkg/runtime/interfaces/cluster_resource_configuration.go +++ b/flyteadmin/pkg/runtime/interfaces/cluster_resource_configuration.go @@ -42,7 +42,8 @@ type ClusterResourceConfig struct { foo: value: "baz" */ - CustomData map[DomainName]TemplateData `json:"customData"` + CustomData map[DomainName]TemplateData `json:"customData"` + StandaloneDeployment bool `json:"standaloneDeployment" pflag:", Whether the cluster resource sync is running in a standalone deployment and should call flyteadmin service endpoints"` } type ClusterResourceConfiguration interface { @@ -50,4 +51,5 @@ type ClusterResourceConfiguration interface { GetTemplateData() map[string]DataSource GetRefreshInterval() time.Duration GetCustomTemplateData() map[DomainName]TemplateData + IsStandaloneDeployment() bool } diff --git a/flyteadmin/pkg/runtime/mocks/mock_cluster_resource_provider.go b/flyteadmin/pkg/runtime/mocks/mock_cluster_resource_provider.go index e2d8548ae0..500f849ba5 100644 --- a/flyteadmin/pkg/runtime/mocks/mock_cluster_resource_provider.go +++ b/flyteadmin/pkg/runtime/mocks/mock_cluster_resource_provider.go @@ -7,10 +7,11 @@ import ( ) type MockClusterResourceConfiguration struct { - TemplatePath string - TemplateData interfaces.TemplateData - RefreshInterval time.Duration - CustomTemplateData map[interfaces.DomainName]interfaces.TemplateData + TemplatePath string + TemplateData interfaces.TemplateData + RefreshInterval time.Duration + CustomTemplateData map[interfaces.DomainName]interfaces.TemplateData + StandaloneDeployment bool } func (c MockClusterResourceConfiguration) GetTemplatePath() string { @@ -28,6 +29,10 @@ func (c MockClusterResourceConfiguration) GetCustomTemplateData() map[interfaces return c.CustomTemplateData } +func (c MockClusterResourceConfiguration) IsStandaloneDeployment() bool { + return c.StandaloneDeployment +} + func NewMockClusterResourceConfiguration() interfaces.ClusterResourceConfiguration { return &MockClusterResourceConfiguration{} }