From 60b4c876ea105d4c79e3cad7d56fde6b9c208bcd Mon Sep 17 00:00:00 2001 From: Anand Swaminathan Date: Fri, 6 Mar 2020 10:28:46 -0800 Subject: [PATCH] [Backward Incompatible] Implementation of Placement manager and multi cluster refactor (#71) --- cmd/entrypoints/clusterresource.go | 6 +- pkg/executioncluster/execution_target.go | 7 +- pkg/executioncluster/impl/factory.go | 5 +- pkg/executioncluster/impl/in_cluster.go | 3 +- pkg/executioncluster/impl/in_cluster_test.go | 5 +- .../impl/random_cluster_selector.go | 194 ++++++++++-------- .../impl/random_cluster_selector_test.go | 118 +++++++---- pkg/executioncluster/interfaces/cluster.go | 4 +- pkg/executioncluster/mocks/mock_cluster.go | 12 +- .../testdata/clusters_config.yaml | 17 +- .../impl/resources/resource_manager.go | 4 +- pkg/manager/interfaces/resource.go | 17 +- pkg/rpc/adminservice/base.go | 3 +- pkg/runtime/cluster_config_provider.go | 8 +- pkg/runtime/config_provider_test.go | 3 - .../interfaces/cluster_configuration.go | 27 ++- pkg/workflowengine/impl/propeller_executor.go | 10 +- .../impl/propeller_executor_test.go | 22 +- 18 files changed, 274 insertions(+), 191 deletions(-) diff --git a/cmd/entrypoints/clusterresource.go b/cmd/entrypoints/clusterresource.go index cc1696124..726ab6ba9 100644 --- a/cmd/entrypoints/clusterresource.go +++ b/cmd/entrypoints/clusterresource.go @@ -56,7 +56,8 @@ var controllerRunCmd = &cobra.Command{ scope.NewSubScope("cluster"), cfg.KubeConfig, cfg.Master, - configuration) + configuration, + db) clusterResourceController := clusterresource.NewClusterResourceController(db, executionCluster, scope) clusterResourceController.Run() @@ -88,7 +89,8 @@ var controllerSyncCmd = &cobra.Command{ scope.NewSubScope("cluster"), cfg.KubeConfig, cfg.Master, - configuration) + configuration, + db) clusterResourceController := clusterresource.NewClusterResourceController(db, executionCluster, scope) err := clusterResourceController.Sync(ctx) diff --git a/pkg/executioncluster/execution_target.go b/pkg/executioncluster/execution_target.go index d594f19e1..6d9fd7697 100644 --- a/pkg/executioncluster/execution_target.go +++ b/pkg/executioncluster/execution_target.go @@ -1,7 +1,6 @@ package executioncluster import ( - "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" flyteclient "github.com/lyft/flytepropeller/pkg/client/clientset/versioned" "github.com/lyft/flytestdlib/random" @@ -11,7 +10,11 @@ import ( // Spec to determine the execution target type ExecutionTargetSpec struct { TargetID string - ExecutionID *core.WorkflowExecutionIdentifier + ExecutionID string + Project string + Domain string + Workflow string + LaunchPlan string } // Client object of the target execution cluster diff --git a/pkg/executioncluster/impl/factory.go b/pkg/executioncluster/impl/factory.go index 4c85379d1..1a0e44984 100644 --- a/pkg/executioncluster/impl/factory.go +++ b/pkg/executioncluster/impl/factory.go @@ -2,11 +2,12 @@ package impl import ( executioncluster_interface "github.com/lyft/flyteadmin/pkg/executioncluster/interfaces" + "github.com/lyft/flyteadmin/pkg/repositories" "github.com/lyft/flyteadmin/pkg/runtime/interfaces" "github.com/lyft/flytestdlib/promutils" ) -func GetExecutionCluster(scope promutils.Scope, kubeConfig, master string, config interfaces.Configuration) executioncluster_interface.ClusterInterface { +func GetExecutionCluster(scope promutils.Scope, kubeConfig, master string, config interfaces.Configuration, db repositories.RepositoryInterface) executioncluster_interface.ClusterInterface { switch len(config.ClusterConfiguration().GetClusterConfigs()) { case 0: cluster, err := NewInCluster(scope, kubeConfig, master) @@ -15,7 +16,7 @@ func GetExecutionCluster(scope promutils.Scope, kubeConfig, master string, confi } return cluster default: - cluster, err := NewRandomClusterSelector(scope, config.ClusterConfiguration(), &clusterExecutionTargetProvider{}, config.ApplicationConfiguration().GetDomainsConfig()) + cluster, err := NewRandomClusterSelector(scope, config.ClusterConfiguration(), &clusterExecutionTargetProvider{}, db) if err != nil { panic(err) } diff --git a/pkg/executioncluster/impl/in_cluster.go b/pkg/executioncluster/impl/in_cluster.go index 05a28713d..0254973ba 100644 --- a/pkg/executioncluster/impl/in_cluster.go +++ b/pkg/executioncluster/impl/in_cluster.go @@ -1,6 +1,7 @@ package impl import ( + "context" "fmt" "github.com/lyft/flyteadmin/pkg/executioncluster" @@ -15,7 +16,7 @@ type InCluster struct { target executioncluster.ExecutionTarget } -func (i InCluster) GetTarget(spec *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) { +func (i InCluster) GetTarget(ctx context.Context, spec *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) { if spec != nil && spec.TargetID != "" { return nil, errors.New(fmt.Sprintf("remote target %s is not supported", spec.TargetID)) } diff --git a/pkg/executioncluster/impl/in_cluster_test.go b/pkg/executioncluster/impl/in_cluster_test.go index 414fab1e0..4b1cb9629 100644 --- a/pkg/executioncluster/impl/in_cluster_test.go +++ b/pkg/executioncluster/impl/in_cluster_test.go @@ -1,6 +1,7 @@ package impl import ( + "context" "testing" "github.com/lyft/flyteadmin/pkg/executioncluster" @@ -14,7 +15,7 @@ func TestInClusterGetTarget(t *testing.T) { ID: "t1", }, } - target, err := cluster.GetTarget(nil) + target, err := cluster.GetTarget(context.Background(), nil) assert.Nil(t, err) assert.Equal(t, "t1", target.ID) } @@ -23,7 +24,7 @@ func TestInClusterGetRemoteTarget(t *testing.T) { cluster := InCluster{ target: executioncluster.ExecutionTarget{}, } - _, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{TargetID: "t1"}) + _, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{TargetID: "t1"}) assert.EqualError(t, err, "remote target t1 is not supported") } diff --git a/pkg/executioncluster/impl/random_cluster_selector.go b/pkg/executioncluster/impl/random_cluster_selector.go index 0450fa060..724fb2345 100644 --- a/pkg/executioncluster/impl/random_cluster_selector.go +++ b/pkg/executioncluster/impl/random_cluster_selector.go @@ -6,6 +6,16 @@ import ( "hash/fnv" "math/rand" + "github.com/lyft/flytestdlib/logger" + + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + + "github.com/lyft/flyteadmin/pkg/errors" + "github.com/lyft/flyteadmin/pkg/manager/impl/resources" + managerInterfaces "github.com/lyft/flyteadmin/pkg/manager/interfaces" + "github.com/lyft/flyteadmin/pkg/repositories" + "google.golang.org/grpc/codes" + "github.com/lyft/flyteadmin/pkg/executioncluster" "github.com/lyft/flyteadmin/pkg/executioncluster/interfaces" @@ -18,8 +28,10 @@ import ( // Implementation of Random cluster selector // Selects cluster based on weights and domains. type RandomClusterSelector struct { - domainWeightedRandomMap map[string]random.WeightedRandomList - executionTargetMap map[string]executioncluster.ExecutionTarget + equalWeightedAllClusters random.WeightedRandomList + labelWeightedRandomMap map[string]random.WeightedRandomList + executionTargetMap map[string]executioncluster.ExecutionTarget + resourceManager managerInterfaces.ResourceInterface } func getRandSource(seed string) (rand.Source, error) { @@ -32,85 +44,59 @@ func getRandSource(seed string) (rand.Source, error) { return rand.NewSource(hashedSeed), nil } -func getValidDomainMap(validDomains runtime.DomainsConfig) map[string]runtime.Domain { - domainMap := make(map[string]runtime.Domain) - for _, domain := range validDomains { - domainMap[domain.ID] = domain - } - return domainMap -} - -func getExecutionTargetMap(scope promutils.Scope, executionTargetProvider interfaces.ExecutionTargetProvider, clusterConfig runtime.ClusterConfiguration) (map[string]executioncluster.ExecutionTarget, error) { +func getExecutionTargets(ctx context.Context, scope promutils.Scope, executionTargetProvider interfaces.ExecutionTargetProvider, + clusterConfig runtime.ClusterConfiguration) (random.WeightedRandomList, map[string]executioncluster.ExecutionTarget, error) { executionTargetMap := make(map[string]executioncluster.ExecutionTarget) + entries := make([]random.Entry, 0) for _, cluster := range clusterConfig.GetClusterConfigs() { if _, ok := executionTargetMap[cluster.Name]; ok { - return nil, fmt.Errorf("duplicate clusters for name %s", cluster.Name) + return nil, nil, fmt.Errorf("duplicate clusters for name %s", cluster.Name) } executionTarget, err := executionTargetProvider.GetExecutionTarget(scope, cluster) if err != nil { - return nil, err + return nil, nil, err } executionTargetMap[cluster.Name] = *executionTarget - } - return executionTargetMap, nil -} - -func getDomainsForCluster(cluster runtime.ClusterConfig, domainMap map[string]runtime.Domain) ([]string, error) { - if len(cluster.AllowedDomains) == 0 { - allDomains := make([]string, len(domainMap)) - index := 0 - for id := range domainMap { - allDomains[index] = id - index++ + if executionTarget.Enabled { + targetEntry := random.Entry{ + Item: *executionTarget, + } + entries = append(entries, targetEntry) } - return allDomains, nil } - for _, allowedDomain := range cluster.AllowedDomains { - if _, ok := domainMap[allowedDomain]; !ok { - return nil, fmt.Errorf("invalid domain %s", allowedDomain) - } + weightedRandomList, err := random.NewWeightedRandom(ctx, entries) + if err != nil { + return nil, nil, err } - return cluster.AllowedDomains, nil + return weightedRandomList, executionTargetMap, nil } -func getDomainWeightedRandomForCluster(ctx context.Context, scope promutils.Scope, executionTargetProvider interfaces.ExecutionTargetProvider, - clusterConfig runtime.ClusterConfiguration, - domainMap map[string]runtime.Domain) (map[string]random.WeightedRandomList, error) { - domainEntriesMap := make(map[string][]random.Entry) - for _, cluster := range clusterConfig.GetClusterConfigs() { - // If cluster is not enabled, it is not eligible for selection - if !cluster.Enabled { - continue - } - executionTarget, err := executionTargetProvider.GetExecutionTarget(scope, cluster) - if err != nil { - return nil, err - } - targetEntry := random.Entry{ - Item: *executionTarget, - Weight: cluster.Weight, - } - clusterDomains, err := getDomainsForCluster(cluster, domainMap) - if err != nil { - return nil, err - } - for _, domain := range clusterDomains { - if _, ok := domainEntriesMap[domain]; ok { - domainEntriesMap[domain] = append(domainEntriesMap[domain], targetEntry) - } else { - domainEntriesMap[domain] = []random.Entry{targetEntry} +func getLabeledWeightedRandomForCluster(ctx context.Context, + clusterConfig runtime.ClusterConfiguration, executionTargetMap map[string]executioncluster.ExecutionTarget) (map[string]random.WeightedRandomList, error) { + labeledWeightedRandomMap := make(map[string]random.WeightedRandomList) + for label, clusterEntities := range clusterConfig.GetLabelClusterMap() { + entries := make([]random.Entry, 0) + for _, clusterEntity := range clusterEntities { + cluster := executionTargetMap[clusterEntity.ID] + // If cluster is not enabled, it is not eligible for selection + if !cluster.Enabled { + continue + } + targetEntry := random.Entry{ + Item: cluster, + Weight: clusterEntity.Weight, } + entries = append(entries, targetEntry) } - } - domainWeightedRandomMap := make(map[string]random.WeightedRandomList) - for domain, entries := range domainEntriesMap { - weightedRandomList, err := random.NewWeightedRandom(ctx, entries) - if err != nil { - return nil, err + if len(entries) > 0 { + weightedRandomList, err := random.NewWeightedRandom(ctx, entries) + if err != nil { + return nil, err + } + labeledWeightedRandomMap[label] = weightedRandomList } - domainWeightedRandomMap[domain] = weightedRandomList } - return domainWeightedRandomMap, nil + return labeledWeightedRandomMap, nil } func (s RandomClusterSelector) GetAllValidTargets() []executioncluster.ExecutionTarget { @@ -123,9 +109,9 @@ func (s RandomClusterSelector) GetAllValidTargets() []executioncluster.Execution return v } -func (s RandomClusterSelector) GetTarget(spec *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) { - if spec == nil || (spec.TargetID == "" && spec.ExecutionID == nil) { - return nil, fmt.Errorf("invalid executionTargetSpec %v", spec) +func (s RandomClusterSelector) GetTarget(ctx context.Context, spec *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) { + if spec == nil { + return nil, fmt.Errorf("empty executionTargetSpec") } if spec.TargetID != "" { if val, ok := s.executionTargetMap[spec.TargetID]; ok { @@ -133,41 +119,67 @@ func (s RandomClusterSelector) GetTarget(spec *executioncluster.ExecutionTargetS } return nil, fmt.Errorf("invalid cluster target %s", spec.TargetID) } - if spec.ExecutionID != nil { - if weightedRandomList, ok := s.domainWeightedRandomMap[spec.ExecutionID.GetDomain()]; ok { - executionName := spec.ExecutionID.GetName() - if executionName != "" { - randSrc, err := getRandSource(executionName) - if err != nil { - return nil, err - } - result, err := weightedRandomList.GetWithSeed(randSrc) - if err != nil { - return nil, err - } - execTarget := result.(executioncluster.ExecutionTarget) - return &execTarget, nil - } - execTarget := weightedRandomList.Get().(executioncluster.ExecutionTarget) - return &execTarget, nil + resource, err := s.resourceManager.GetResource(ctx, managerInterfaces.ResourceRequest{ + Project: spec.Project, + Domain: spec.Domain, + Workflow: spec.Workflow, + LaunchPlan: spec.LaunchPlan, + ResourceType: admin.MatchableResource_EXECUTION_CLUSTER_LABEL, + }) + if err != nil { + if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound { + return nil, err } + return nil, err } - return nil, fmt.Errorf("invalid executionTargetSpec %v", *spec) + var weightedRandomList random.WeightedRandomList + if resource != nil && resource.Attributes.GetExecutionClusterLabel() != nil { + label := resource.Attributes.GetExecutionClusterLabel().Value + if _, ok := s.labelWeightedRandomMap[label]; ok { + weightedRandomList = s.labelWeightedRandomMap[label] + } else { + logger.Debugf(ctx, "No cluster mapping found for the label %s", label) + } + } else { + logger.Debugf(ctx, "No override found for the spec %v", spec) + } + // If there is no label associated (or) if the label is invalid, choose from all enabled clusters. + // Note that if there is a valid label with zero "Enabled" clusters, we still choose from all enabled ones. + if weightedRandomList == nil { + weightedRandomList = s.equalWeightedAllClusters + } + + executionName := spec.ExecutionID + if executionName != "" { + randSrc, err := getRandSource(executionName) + if err != nil { + return nil, err + } + result, err := weightedRandomList.GetWithSeed(randSrc) + if err != nil { + return nil, err + } + execTarget := result.(executioncluster.ExecutionTarget) + return &execTarget, nil + } + execTarget := weightedRandomList.Get().(executioncluster.ExecutionTarget) + return &execTarget, nil } -func NewRandomClusterSelector(scope promutils.Scope, clusterConfig runtime.ClusterConfiguration, executionTargetProvider interfaces.ExecutionTargetProvider, domainConfig *runtime.DomainsConfig) (interfaces.ClusterInterface, error) { - executionTargetMap, err := getExecutionTargetMap(scope, executionTargetProvider, clusterConfig) +func NewRandomClusterSelector(scope promutils.Scope, clusterConfig runtime.ClusterConfiguration, executionTargetProvider interfaces.ExecutionTargetProvider, db repositories.RepositoryInterface) (interfaces.ClusterInterface, error) { + equalWeightedAllClusters, executionTargetMap, err := getExecutionTargets(context.Background(), scope, executionTargetProvider, clusterConfig) if err != nil { return nil, err } - domainMap := getValidDomainMap(*domainConfig) - domainWeightedRandomMap, err := getDomainWeightedRandomForCluster(context.Background(), scope, executionTargetProvider, clusterConfig, domainMap) + labelWeightedRandomMap, err := getLabeledWeightedRandomForCluster(context.Background(), clusterConfig, executionTargetMap) if err != nil { return nil, err } return &RandomClusterSelector{ - domainWeightedRandomMap: domainWeightedRandomMap, - executionTargetMap: executionTargetMap, + labelWeightedRandomMap: labelWeightedRandomMap, + executionTargetMap: executionTargetMap, + resourceManager: resources.NewResourceManager(db), + equalWeightedAllClusters: equalWeightedAllClusters, }, nil } diff --git a/pkg/executioncluster/impl/random_cluster_selector_test.go b/pkg/executioncluster/impl/random_cluster_selector_test.go index d655da920..e6345a8ff 100644 --- a/pkg/executioncluster/impl/random_cluster_selector_test.go +++ b/pkg/executioncluster/impl/random_cluster_selector_test.go @@ -8,12 +8,18 @@ import ( "strings" "testing" + "github.com/golang/protobuf/proto" + "github.com/lyft/flyteadmin/pkg/errors" + repo_interface "github.com/lyft/flyteadmin/pkg/repositories/interfaces" + repo_mock "github.com/lyft/flyteadmin/pkg/repositories/mocks" + "github.com/lyft/flyteadmin/pkg/repositories/models" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "google.golang.org/grpc/codes" + "github.com/lyft/flyteadmin/pkg/executioncluster" interfaces2 "github.com/lyft/flyteadmin/pkg/executioncluster/interfaces" "github.com/lyft/flyteadmin/pkg/executioncluster/mocks" "github.com/lyft/flyteadmin/pkg/runtime" - "github.com/lyft/flyteadmin/pkg/runtime/interfaces" - "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytestdlib/config" "github.com/lyft/flytestdlib/config/viper" "github.com/lyft/flytestdlib/promutils" @@ -21,7 +27,9 @@ import ( "github.com/stretchr/testify/assert" ) -var defaultDomains = []interfaces.Domain{{ID: "d1", Name: "d1"}, {ID: "d2", Name: "d2"}, {ID: "d3", Name: "domain3"}} +const testProject = "project" +const testDomain = "domain" +const testWorkflow = "name" func initTestConfig(fileName string) error { var searchPaths []string @@ -36,86 +44,120 @@ func initTestConfig(fileName string) error { return configAccessor.UpdateConfig(context.Background()) } -func getRandomClusterSelectorForTest(t *testing.T, domainsConfig interfaces.DomainsConfig) interfaces2.ClusterInterface { +func getRandomClusterSelectorForTest(t *testing.T) interfaces2.ClusterInterface { var clusterScope promutils.Scope err := initTestConfig("clusters_config.yaml") assert.NoError(t, err) + db := repo_mock.NewMockRepository() + db.ResourceRepo().(*repo_mock.MockResourceRepo).GetFunction = func(ctx context.Context, ID repo_interface.ResourceID) (resource models.Resource, e error) { + assert.Equal(t, "EXECUTION_CLUSTER_LABEL", ID.ResourceType) + if ID.Project == "" { + return models.Resource{}, errors.NewFlyteAdminErrorf(codes.NotFound, + "Resource [%+v] not found", ID) + } + response := models.Resource{ + Project: ID.Project, + Domain: ID.Domain, + Workflow: ID.Workflow, + ResourceType: ID.ResourceType, + LaunchPlan: ID.LaunchPlan, + } + if ID.Project == testProject && ID.Domain == testDomain { + matchingAttributes := &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_ExecutionClusterLabel{ + ExecutionClusterLabel: &admin.ExecutionClusterLabel{ + Value: "test", + }, + }, + } + marshalledMatchingAttributes, _ := proto.Marshal(matchingAttributes) + response.Attributes = marshalledMatchingAttributes + } else { + matchingAttributes := &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_ExecutionClusterLabel{ + ExecutionClusterLabel: &admin.ExecutionClusterLabel{ + Value: "all", + }, + }, + } + marshalledMatchingAttributes, _ := proto.Marshal(matchingAttributes) + response.Attributes = marshalledMatchingAttributes + } + return response, nil + } configProvider := runtime.NewConfigurationProvider() - randomCluster, err := NewRandomClusterSelector(clusterScope, configProvider.ClusterConfiguration(), &mocks.MockExecutionTargetProvider{}, &domainsConfig) + randomCluster, err := NewRandomClusterSelector(clusterScope, configProvider.ClusterConfiguration(), &mocks.MockExecutionTargetProvider{}, db) assert.NoError(t, err) return randomCluster } func TestRandomClusterSelectorGetTarget(t *testing.T) { - cluster := getRandomClusterSelectorForTest(t, defaultDomains) - target, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{TargetID: "testcluster"}) + cluster := getRandomClusterSelectorForTest(t) + target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{TargetID: "testcluster"}) assert.Nil(t, err) assert.Equal(t, "testcluster", target.ID) assert.False(t, target.Enabled) - target, err = cluster.GetTarget(&executioncluster.ExecutionTargetSpec{TargetID: "testcluster2"}) + target, err = cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{TargetID: "testcluster2"}) assert.Nil(t, err) assert.Equal(t, "testcluster2", target.ID) assert.True(t, target.Enabled) } func TestRandomClusterSelectorGetTargetForDomain(t *testing.T) { - cluster := getRandomClusterSelectorForTest(t, defaultDomains) - target, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{ExecutionID: &core.WorkflowExecutionIdentifier{ - Domain: "d1", - }}) + cluster := getRandomClusterSelectorForTest(t) + target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{ + Project: testProject, + Domain: testDomain, + ExecutionID: "e", + }) assert.Nil(t, err) assert.Equal(t, "testcluster2", target.ID) assert.True(t, target.Enabled) } -func TestRandomClusterSelectorGetTargetForDomainAndExecution(t *testing.T) { - cluster := getRandomClusterSelectorForTest(t, defaultDomains) - target, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{ExecutionID: &core.WorkflowExecutionIdentifier{ - Domain: "d2", - Name: "exec", - }}) +func TestRandomClusterSelectorGetTargetForExecution(t *testing.T) { + cluster := getRandomClusterSelectorForTest(t) + target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{ + Project: testProject, + Domain: "different", + Workflow: testWorkflow, + ExecutionID: "e1", + }) assert.Nil(t, err) assert.Equal(t, "testcluster3", target.ID) assert.True(t, target.Enabled) } func TestRandomClusterSelectorGetTargetForDomainAndExecution2(t *testing.T) { - cluster := getRandomClusterSelectorForTest(t, defaultDomains) - target, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{ExecutionID: &core.WorkflowExecutionIdentifier{ - Domain: "d2", - Name: "exec2", - }}) + cluster := getRandomClusterSelectorForTest(t) + target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{ + Project: testProject, + Domain: "different", + Workflow: testWorkflow, + ExecutionID: "e22", + }) assert.Nil(t, err) assert.Equal(t, "testcluster2", target.ID) assert.True(t, target.Enabled) } -func TestRandomClusterSelectorGetTargetForInvalidDomain(t *testing.T) { - cluster := getRandomClusterSelectorForTest(t, defaultDomains) - _, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{ExecutionID: &core.WorkflowExecutionIdentifier{ - Domain: "d4", - Name: "exec", - }}) - assert.EqualError(t, err, "invalid executionTargetSpec { domain:\"d4\" name:\"exec\" }") -} - func TestRandomClusterSelectorGetRandomTarget(t *testing.T) { - cluster := getRandomClusterSelectorForTest(t, defaultDomains) - _, err := cluster.GetTarget(nil) + cluster := getRandomClusterSelectorForTest(t) + _, err := cluster.GetTarget(context.Background(), nil) assert.NotNil(t, err) - assert.EqualError(t, err, "invalid executionTargetSpec ") + assert.EqualError(t, err, "empty executionTargetSpec") } func TestRandomClusterSelectorGetRemoteTarget(t *testing.T) { - cluster := getRandomClusterSelectorForTest(t, defaultDomains) - _, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{TargetID: "cluster-3"}) + cluster := getRandomClusterSelectorForTest(t) + _, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{TargetID: "cluster-3"}) assert.NotNil(t, err) assert.EqualError(t, err, "invalid cluster target cluster-3") } func TestRandomClusterSelectorGetAllValidTargets(t *testing.T) { - cluster := getRandomClusterSelectorForTest(t, defaultDomains) + cluster := getRandomClusterSelectorForTest(t) targets := cluster.GetAllValidTargets() assert.Equal(t, 2, len(targets)) } diff --git a/pkg/executioncluster/interfaces/cluster.go b/pkg/executioncluster/interfaces/cluster.go index 1b09767c9..a7debde56 100644 --- a/pkg/executioncluster/interfaces/cluster.go +++ b/pkg/executioncluster/interfaces/cluster.go @@ -1,11 +1,13 @@ package interfaces import ( + "context" + "github.com/lyft/flyteadmin/pkg/executioncluster" ) // Interface for the Execution Cluster type ClusterInterface interface { - GetTarget(*executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) + GetTarget(context.Context, *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) GetAllValidTargets() []executioncluster.ExecutionTarget } diff --git a/pkg/executioncluster/mocks/mock_cluster.go b/pkg/executioncluster/mocks/mock_cluster.go index 74fc3552c..b0f22f364 100644 --- a/pkg/executioncluster/mocks/mock_cluster.go +++ b/pkg/executioncluster/mocks/mock_cluster.go @@ -1,8 +1,12 @@ package mocks -import "github.com/lyft/flyteadmin/pkg/executioncluster" +import ( + "context" -type GetTargetFunc func(*executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) + "github.com/lyft/flyteadmin/pkg/executioncluster" +) + +type GetTargetFunc func(context.Context, *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) type GetAllValidTargetsFunc func() []executioncluster.ExecutionTarget type MockCluster struct { @@ -18,9 +22,9 @@ func (m *MockCluster) SetGetAllValidTargetsCallback(getAllValidTargetsFunc GetAl m.getAllValidTargetsFunc = getAllValidTargetsFunc } -func (m *MockCluster) GetTarget(execCluster *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) { +func (m *MockCluster) GetTarget(ctx context.Context, execCluster *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) { if m.getTargetFunc != nil { - return m.getTargetFunc(execCluster) + return m.getTargetFunc(ctx, execCluster) } return nil, nil } diff --git a/pkg/executioncluster/testdata/clusters_config.yaml b/pkg/executioncluster/testdata/clusters_config.yaml index b17d94a3f..a6710c524 100644 --- a/pkg/executioncluster/testdata/clusters_config.yaml +++ b/pkg/executioncluster/testdata/clusters_config.yaml @@ -1,4 +1,13 @@ clusters: + labelClusterMap: + test: + - id: testcluster + weight: 1 + all: + - id: testcluster2 + weight: 0.5 + - id: testcluster3 + weight: 0.5 clusterConfigs: - name: "testcluster" endpoint: "testcluster_endpoint" @@ -8,11 +17,7 @@ clusters: certPath: "/path/to/testcluster/cert" - name: "testcluster2" endpoint: "testcluster2_endpoint" - weight: 0.5 enabled: true - allowedDomains: - - "d1" - - "d2" auth: type: "file_path" tokenPath: "/path/to/testcluster2/token" @@ -20,10 +25,6 @@ clusters: - name: "testcluster3" endpoint: "testcluster3_endpoint" enabled: true - weight: 0.5 - allowedDomains: - - "d2" - - "d3" auth: type: "file_path" tokenPath: "/path/to/testcluster3/token" diff --git a/pkg/manager/impl/resources/resource_manager.go b/pkg/manager/impl/resources/resource_manager.go index 11bc27c46..f6b1e82d7 100644 --- a/pkg/manager/impl/resources/resource_manager.go +++ b/pkg/manager/impl/resources/resource_manager.go @@ -78,12 +78,12 @@ func (m *ResourceManager) GetWorkflowAttributes( if err := validation.ValidateWorkflowAttributesGetRequest(request); err != nil { return nil, err } - projectAttributesModel, err := m.db.ResourceRepo().Get( + workflowAttributesModel, err := m.db.ResourceRepo().Get( ctx, repo_interface.ResourceID{Project: request.Project, Domain: request.Domain, Workflow: request.Workflow, ResourceType: request.ResourceType.String()}) if err != nil { return nil, err } - workflowAttributes, err := transformers.FromResourceModelToWorkflowAttributes(projectAttributesModel) + workflowAttributes, err := transformers.FromResourceModelToWorkflowAttributes(workflowAttributesModel) if err != nil { return nil, err } diff --git a/pkg/manager/interfaces/resource.go b/pkg/manager/interfaces/resource.go index 2ca388ab9..c8956dbd5 100644 --- a/pkg/manager/interfaces/resource.go +++ b/pkg/manager/interfaces/resource.go @@ -8,15 +8,10 @@ import ( // Interface for managing project, domain and workflow -specific attributes. type ResourceInterface interface { + ListAll(ctx context.Context, request admin.ListMatchableAttributesRequest) ( + *admin.ListMatchableAttributesResponse, error) GetResource(ctx context.Context, request ResourceRequest) (*ResourceResponse, error) - UpdateWorkflowAttributes(ctx context.Context, request admin.WorkflowAttributesUpdateRequest) ( - *admin.WorkflowAttributesUpdateResponse, error) - GetWorkflowAttributes(ctx context.Context, request admin.WorkflowAttributesGetRequest) ( - *admin.WorkflowAttributesGetResponse, error) - DeleteWorkflowAttributes(ctx context.Context, request admin.WorkflowAttributesDeleteRequest) ( - *admin.WorkflowAttributesDeleteResponse, error) - UpdateProjectDomainAttributes(ctx context.Context, request admin.ProjectDomainAttributesUpdateRequest) ( *admin.ProjectDomainAttributesUpdateResponse, error) GetProjectDomainAttributes(ctx context.Context, request admin.ProjectDomainAttributesGetRequest) ( @@ -24,8 +19,12 @@ type ResourceInterface interface { DeleteProjectDomainAttributes(ctx context.Context, request admin.ProjectDomainAttributesDeleteRequest) ( *admin.ProjectDomainAttributesDeleteResponse, error) - ListAll(ctx context.Context, request admin.ListMatchableAttributesRequest) ( - *admin.ListMatchableAttributesResponse, error) + UpdateWorkflowAttributes(ctx context.Context, request admin.WorkflowAttributesUpdateRequest) ( + *admin.WorkflowAttributesUpdateResponse, error) + GetWorkflowAttributes(ctx context.Context, request admin.WorkflowAttributesGetRequest) ( + *admin.WorkflowAttributesGetResponse, error) + DeleteWorkflowAttributes(ctx context.Context, request admin.WorkflowAttributesDeleteRequest) ( + *admin.WorkflowAttributesDeleteResponse, error) } // TODO we can move this to flyteidl, once we are exposing an endpoint diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index b689f3078..17d1cebac 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -80,7 +80,8 @@ func NewAdminServer(kubeConfig, master string) *AdminService { adminScope.NewSubScope("executor").NewSubScope("cluster"), kubeConfig, master, - configuration) + configuration, + db) workflowExecutor := workflowengine.NewFlytePropeller( applicationConfiguration.RoleNameKey, execCluster, diff --git a/pkg/runtime/cluster_config_provider.go b/pkg/runtime/cluster_config_provider.go index ae59b6c61..94819d152 100644 --- a/pkg/runtime/cluster_config_provider.go +++ b/pkg/runtime/cluster_config_provider.go @@ -17,11 +17,13 @@ var clusterConfig = config.MustRegisterSection(clustersKey, &interfaces.Clusters // Implementation of an interfaces.ClusterConfiguration type ClusterConfigurationProvider struct{} -func (p *ClusterConfigurationProvider) GetClusterSelectionStrategy() interfaces.ClusterSelectionStrategy { +func (p *ClusterConfigurationProvider) GetLabelClusterMap() map[string][]interfaces.ClusterEntity { if clusterConfig != nil { - return clusterConfig.GetConfig().(*interfaces.Clusters).ClusterSelection + clusters := clusterConfig.GetConfig().(*interfaces.Clusters) + return clusters.LabelClusterMap } - return interfaces.ClusterSelectionRandom + logger.Warningf(context.Background(), "Failed to find clusters in config. Returning an empty slice") + return make(map[string][]interfaces.ClusterEntity) } func (p *ClusterConfigurationProvider) GetClusterConfigs() []interfaces.ClusterConfig { diff --git a/pkg/runtime/config_provider_test.go b/pkg/runtime/config_provider_test.go index b132673ae..0e19811bb 100644 --- a/pkg/runtime/config_provider_test.go +++ b/pkg/runtime/config_provider_test.go @@ -7,8 +7,6 @@ import ( "strings" "testing" - "github.com/lyft/flyteadmin/pkg/runtime/interfaces" - "path/filepath" "github.com/lyft/flytestdlib/config" @@ -35,7 +33,6 @@ func TestClusterConfig(t *testing.T) { configProvider := NewConfigurationProvider() clusterConfig := configProvider.ClusterConfiguration() - assert.Equal(t, interfaces.ClusterSelectionRandom, clusterConfig.GetClusterSelectionStrategy()) clusters := clusterConfig.GetClusterConfigs() assert.Equal(t, 2, len(clusters)) diff --git a/pkg/runtime/interfaces/cluster_configuration.go b/pkg/runtime/interfaces/cluster_configuration.go index a28ef1c73..880a19825 100644 --- a/pkg/runtime/interfaces/cluster_configuration.go +++ b/pkg/runtime/interfaces/cluster_configuration.go @@ -8,12 +8,10 @@ import ( // Holds details about a cluster used for workflow execution. type ClusterConfig struct { - Name string `json:"name"` - Endpoint string `json:"endpoint"` - Auth Auth `json:"auth"` - Enabled bool `json:"enabled"` - Weight float32 `json:"weight"` - AllowedDomains []string `json:"allowedDomains"` + Name string `json:"name"` + Endpoint string `json:"endpoint"` + Auth Auth `json:"auth"` + Enabled bool `json:"enabled"` } type Auth struct { @@ -22,11 +20,10 @@ type Auth struct { CertPath string `json:"certPath"` } -type ClusterSelectionStrategy string - -var ( - ClusterSelectionRandom ClusterSelectionStrategy -) +type ClusterEntity struct { + ID string `json:"id"` + Weight float32 `json:"weight"` +} func (auth Auth) GetCA() ([]byte, error) { cert, err := ioutil.ReadFile(auth.CertPath) @@ -45,8 +42,8 @@ func (auth Auth) GetToken() (string, error) { } type Clusters struct { - ClusterConfigs []ClusterConfig `json:"clusterConfigs"` - ClusterSelection ClusterSelectionStrategy `json:"clusterSelectionStrategy"` + ClusterConfigs []ClusterConfig `json:"clusterConfigs"` + LabelClusterMap map[string][]ClusterEntity `json:"labelClusterMap"` } // Provides values set in runtime configuration files. @@ -55,6 +52,6 @@ type ClusterConfiguration interface { // Returns clusters defined in runtime configuration files. GetClusterConfigs() []ClusterConfig - // The cluster selection strategy setting - GetClusterSelectionStrategy() ClusterSelectionStrategy + // Returns label cluster map for routing + GetLabelClusterMap() map[string][]ClusterEntity } diff --git a/pkg/workflowengine/impl/propeller_executor.go b/pkg/workflowengine/impl/propeller_executor.go index fd8c76c86..ad58413aa 100644 --- a/pkg/workflowengine/impl/propeller_executor.go +++ b/pkg/workflowengine/impl/propeller_executor.go @@ -118,9 +118,13 @@ func (c *FlytePropeller) ExecuteWorkflow(ctx context.Context, input interfaces.E flyteWf.Annotations = annotations executionTargetSpec := executioncluster.ExecutionTargetSpec{ - ExecutionID: input.ExecutionID, + Project: input.ExecutionID.Project, + Domain: input.ExecutionID.Domain, + Workflow: input.Reference.Spec.WorkflowId.Name, + LaunchPlan: input.Reference.Id.Name, + ExecutionID: input.ExecutionID.Name, } - targetCluster, err := c.executionCluster.GetTarget(&executionTargetSpec) + targetCluster, err := c.executionCluster.GetTarget(ctx, &executionTargetSpec) if err != nil { return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to create workflow in propeller %v", err) } @@ -148,7 +152,7 @@ func (c *FlytePropeller) TerminateWorkflowExecution( return errors.NewFlyteAdminErrorf(codes.Internal, "invalid execution id") } namespace := common.GetNamespaceName(c.config.GetNamespaceMappingConfig(), input.ExecutionID.GetProject(), input.ExecutionID.GetDomain()) - target, err := c.executionCluster.GetTarget(&executioncluster.ExecutionTargetSpec{ + target, err := c.executionCluster.GetTarget(ctx, &executioncluster.ExecutionTargetSpec{ TargetID: input.Cluster, }) if err != nil { diff --git a/pkg/workflowengine/impl/propeller_executor_test.go b/pkg/workflowengine/impl/propeller_executor_test.go index 69e666782..beea62cbc 100644 --- a/pkg/workflowengine/impl/propeller_executor_test.go +++ b/pkg/workflowengine/impl/propeller_executor_test.go @@ -114,7 +114,7 @@ func (b *FakeK8FlyteClient) FlyteworkflowV1alpha1() v1alpha12.FlyteworkflowV1alp func getFakeExecutionCluster() interfaces2.ClusterInterface { fakeCluster := cluster_mock.MockCluster{} - fakeCluster.SetGetTargetCallback(func(spec *executioncluster.ExecutionTargetSpec) (target *executioncluster.ExecutionTarget, e error) { + fakeCluster.SetGetTargetCallback(func(ctx context.Context, spec *executioncluster.ExecutionTargetSpec) (target *executioncluster.ExecutionTarget, e error) { return &executioncluster.ExecutionTarget{ ID: "C1", FlyteClient: &FakeK8FlyteClient{}, @@ -130,8 +130,8 @@ func TestExecuteWorkflowHappyCase(t *testing.T) { Domain: "d", Name: "n", } - cluster.SetGetTargetCallback(func(spec *executioncluster.ExecutionTargetSpec) (target *executioncluster.ExecutionTarget, e error) { - assert.Equal(t, execID, *spec.ExecutionID) + cluster.SetGetTargetCallback(func(ctx context.Context, spec *executioncluster.ExecutionTargetSpec) (target *executioncluster.ExecutionTarget, e error) { + assert.Equal(t, execID.Name, spec.ExecutionID) return &executioncluster.ExecutionTarget{ ID: "C1", FlyteClient: &FakeK8FlyteClient{}, @@ -172,6 +172,9 @@ func TestExecuteWorkflowHappyCase(t *testing.T) { }, Spec: &admin.LaunchPlanSpec{ Role: "role-1", + WorkflowId: &core.Identifier{ + Name: "wf", + }, }, }, AcceptedAt: acceptedAt, @@ -224,6 +227,9 @@ func TestExecuteWorkflowCallFailed(t *testing.T) { }, Spec: &admin.LaunchPlanSpec{ Role: "role-1", + WorkflowId: &core.Identifier{ + Name: "wf", + }, }, }, AcceptedAt: acceptedAt, @@ -273,6 +279,9 @@ func TestExecuteWorkflowAlreadyExistsNoError(t *testing.T) { }, Spec: &admin.LaunchPlanSpec{ Role: "role-1", + WorkflowId: &core.Identifier{ + Name: "wf", + }, }, }, AcceptedAt: acceptedAt, @@ -359,6 +368,11 @@ func TestExecuteWorkflowRoleKeyNotRequired(t *testing.T) { Project: "p", Domain: "d", }, + Spec: &admin.LaunchPlanSpec{ + WorkflowId: &core.Identifier{ + Name: "wf", + }, + }, }, AcceptedAt: acceptedAt, }) @@ -368,7 +382,7 @@ func TestExecuteWorkflowRoleKeyNotRequired(t *testing.T) { func TestTerminateExecution(t *testing.T) { cluster := getFakeExecutionCluster() - target, err := cluster.GetTarget(nil) + target, err := cluster.GetTarget(context.Background(), nil) assert.Nil(t, err) target.ID = "C2" builder := FlyteWorkflowBuilderTest{}