Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
[Backward Incompatible] Implementation of Placement manager and multi…
Browse files Browse the repository at this point in the history
… cluster refactor (#71)
  • Loading branch information
anandswaminathan authored Mar 6, 2020
1 parent cc9917e commit 60b4c87
Show file tree
Hide file tree
Showing 18 changed files with 274 additions and 191 deletions.
6 changes: 4 additions & 2 deletions cmd/entrypoints/clusterresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ var controllerRunCmd = &cobra.Command{
scope.NewSubScope("cluster"),
cfg.KubeConfig,
cfg.Master,
configuration)
configuration,
db)

clusterResourceController := clusterresource.NewClusterResourceController(db, executionCluster, scope)
clusterResourceController.Run()
Expand Down Expand Up @@ -88,7 +89,8 @@ var controllerSyncCmd = &cobra.Command{
scope.NewSubScope("cluster"),
cfg.KubeConfig,
cfg.Master,
configuration)
configuration,
db)

clusterResourceController := clusterresource.NewClusterResourceController(db, executionCluster, scope)
err := clusterResourceController.Sync(ctx)
Expand Down
7 changes: 5 additions & 2 deletions pkg/executioncluster/execution_target.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package executioncluster

import (
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
flyteclient "github.com/lyft/flytepropeller/pkg/client/clientset/versioned"
"github.com/lyft/flytestdlib/random"

Expand All @@ -11,7 +10,11 @@ import (
// Spec to determine the execution target
type ExecutionTargetSpec struct {
TargetID string
ExecutionID *core.WorkflowExecutionIdentifier
ExecutionID string
Project string
Domain string
Workflow string
LaunchPlan string
}

// Client object of the target execution cluster
Expand Down
5 changes: 3 additions & 2 deletions pkg/executioncluster/impl/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package impl

import (
executioncluster_interface "github.com/lyft/flyteadmin/pkg/executioncluster/interfaces"
"github.com/lyft/flyteadmin/pkg/repositories"
"github.com/lyft/flyteadmin/pkg/runtime/interfaces"
"github.com/lyft/flytestdlib/promutils"
)

func GetExecutionCluster(scope promutils.Scope, kubeConfig, master string, config interfaces.Configuration) executioncluster_interface.ClusterInterface {
func GetExecutionCluster(scope promutils.Scope, kubeConfig, master string, config interfaces.Configuration, db repositories.RepositoryInterface) executioncluster_interface.ClusterInterface {
switch len(config.ClusterConfiguration().GetClusterConfigs()) {
case 0:
cluster, err := NewInCluster(scope, kubeConfig, master)
Expand All @@ -15,7 +16,7 @@ func GetExecutionCluster(scope promutils.Scope, kubeConfig, master string, confi
}
return cluster
default:
cluster, err := NewRandomClusterSelector(scope, config.ClusterConfiguration(), &clusterExecutionTargetProvider{}, config.ApplicationConfiguration().GetDomainsConfig())
cluster, err := NewRandomClusterSelector(scope, config.ClusterConfiguration(), &clusterExecutionTargetProvider{}, db)
if err != nil {
panic(err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/executioncluster/impl/in_cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package impl

import (
"context"
"fmt"

"github.com/lyft/flyteadmin/pkg/executioncluster"
Expand All @@ -15,7 +16,7 @@ type InCluster struct {
target executioncluster.ExecutionTarget
}

func (i InCluster) GetTarget(spec *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) {
func (i InCluster) GetTarget(ctx context.Context, spec *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) {
if spec != nil && spec.TargetID != "" {
return nil, errors.New(fmt.Sprintf("remote target %s is not supported", spec.TargetID))
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/executioncluster/impl/in_cluster_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package impl

import (
"context"
"testing"

"github.com/lyft/flyteadmin/pkg/executioncluster"
Expand All @@ -14,7 +15,7 @@ func TestInClusterGetTarget(t *testing.T) {
ID: "t1",
},
}
target, err := cluster.GetTarget(nil)
target, err := cluster.GetTarget(context.Background(), nil)
assert.Nil(t, err)
assert.Equal(t, "t1", target.ID)
}
Expand All @@ -23,7 +24,7 @@ func TestInClusterGetRemoteTarget(t *testing.T) {
cluster := InCluster{
target: executioncluster.ExecutionTarget{},
}
_, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{TargetID: "t1"})
_, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{TargetID: "t1"})
assert.EqualError(t, err, "remote target t1 is not supported")
}

Expand Down
194 changes: 103 additions & 91 deletions pkg/executioncluster/impl/random_cluster_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ import (
"hash/fnv"
"math/rand"

"github.com/lyft/flytestdlib/logger"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flyteadmin/pkg/manager/impl/resources"
managerInterfaces "github.com/lyft/flyteadmin/pkg/manager/interfaces"
"github.com/lyft/flyteadmin/pkg/repositories"
"google.golang.org/grpc/codes"

"github.com/lyft/flyteadmin/pkg/executioncluster"
"github.com/lyft/flyteadmin/pkg/executioncluster/interfaces"

Expand All @@ -18,8 +28,10 @@ import (
// Implementation of Random cluster selector
// Selects cluster based on weights and domains.
type RandomClusterSelector struct {
domainWeightedRandomMap map[string]random.WeightedRandomList
executionTargetMap map[string]executioncluster.ExecutionTarget
equalWeightedAllClusters random.WeightedRandomList
labelWeightedRandomMap map[string]random.WeightedRandomList
executionTargetMap map[string]executioncluster.ExecutionTarget
resourceManager managerInterfaces.ResourceInterface
}

func getRandSource(seed string) (rand.Source, error) {
Expand All @@ -32,85 +44,59 @@ func getRandSource(seed string) (rand.Source, error) {
return rand.NewSource(hashedSeed), nil
}

func getValidDomainMap(validDomains runtime.DomainsConfig) map[string]runtime.Domain {
domainMap := make(map[string]runtime.Domain)
for _, domain := range validDomains {
domainMap[domain.ID] = domain
}
return domainMap
}

func getExecutionTargetMap(scope promutils.Scope, executionTargetProvider interfaces.ExecutionTargetProvider, clusterConfig runtime.ClusterConfiguration) (map[string]executioncluster.ExecutionTarget, error) {
func getExecutionTargets(ctx context.Context, scope promutils.Scope, executionTargetProvider interfaces.ExecutionTargetProvider,
clusterConfig runtime.ClusterConfiguration) (random.WeightedRandomList, map[string]executioncluster.ExecutionTarget, error) {
executionTargetMap := make(map[string]executioncluster.ExecutionTarget)
entries := make([]random.Entry, 0)
for _, cluster := range clusterConfig.GetClusterConfigs() {
if _, ok := executionTargetMap[cluster.Name]; ok {
return nil, fmt.Errorf("duplicate clusters for name %s", cluster.Name)
return nil, nil, fmt.Errorf("duplicate clusters for name %s", cluster.Name)
}
executionTarget, err := executionTargetProvider.GetExecutionTarget(scope, cluster)
if err != nil {
return nil, err
return nil, nil, err
}
executionTargetMap[cluster.Name] = *executionTarget
}
return executionTargetMap, nil
}

func getDomainsForCluster(cluster runtime.ClusterConfig, domainMap map[string]runtime.Domain) ([]string, error) {
if len(cluster.AllowedDomains) == 0 {
allDomains := make([]string, len(domainMap))
index := 0
for id := range domainMap {
allDomains[index] = id
index++
if executionTarget.Enabled {
targetEntry := random.Entry{
Item: *executionTarget,
}
entries = append(entries, targetEntry)
}
return allDomains, nil
}
for _, allowedDomain := range cluster.AllowedDomains {
if _, ok := domainMap[allowedDomain]; !ok {
return nil, fmt.Errorf("invalid domain %s", allowedDomain)
}
weightedRandomList, err := random.NewWeightedRandom(ctx, entries)
if err != nil {
return nil, nil, err
}
return cluster.AllowedDomains, nil
return weightedRandomList, executionTargetMap, nil
}

func getDomainWeightedRandomForCluster(ctx context.Context, scope promutils.Scope, executionTargetProvider interfaces.ExecutionTargetProvider,
clusterConfig runtime.ClusterConfiguration,
domainMap map[string]runtime.Domain) (map[string]random.WeightedRandomList, error) {
domainEntriesMap := make(map[string][]random.Entry)
for _, cluster := range clusterConfig.GetClusterConfigs() {
// If cluster is not enabled, it is not eligible for selection
if !cluster.Enabled {
continue
}
executionTarget, err := executionTargetProvider.GetExecutionTarget(scope, cluster)
if err != nil {
return nil, err
}
targetEntry := random.Entry{
Item: *executionTarget,
Weight: cluster.Weight,
}
clusterDomains, err := getDomainsForCluster(cluster, domainMap)
if err != nil {
return nil, err
}
for _, domain := range clusterDomains {
if _, ok := domainEntriesMap[domain]; ok {
domainEntriesMap[domain] = append(domainEntriesMap[domain], targetEntry)
} else {
domainEntriesMap[domain] = []random.Entry{targetEntry}
func getLabeledWeightedRandomForCluster(ctx context.Context,
clusterConfig runtime.ClusterConfiguration, executionTargetMap map[string]executioncluster.ExecutionTarget) (map[string]random.WeightedRandomList, error) {
labeledWeightedRandomMap := make(map[string]random.WeightedRandomList)
for label, clusterEntities := range clusterConfig.GetLabelClusterMap() {
entries := make([]random.Entry, 0)
for _, clusterEntity := range clusterEntities {
cluster := executionTargetMap[clusterEntity.ID]
// If cluster is not enabled, it is not eligible for selection
if !cluster.Enabled {
continue
}
targetEntry := random.Entry{
Item: cluster,
Weight: clusterEntity.Weight,
}
entries = append(entries, targetEntry)
}
}
domainWeightedRandomMap := make(map[string]random.WeightedRandomList)
for domain, entries := range domainEntriesMap {
weightedRandomList, err := random.NewWeightedRandom(ctx, entries)
if err != nil {
return nil, err
if len(entries) > 0 {
weightedRandomList, err := random.NewWeightedRandom(ctx, entries)
if err != nil {
return nil, err
}
labeledWeightedRandomMap[label] = weightedRandomList
}
domainWeightedRandomMap[domain] = weightedRandomList
}
return domainWeightedRandomMap, nil
return labeledWeightedRandomMap, nil
}

func (s RandomClusterSelector) GetAllValidTargets() []executioncluster.ExecutionTarget {
Expand All @@ -123,51 +109,77 @@ func (s RandomClusterSelector) GetAllValidTargets() []executioncluster.Execution
return v
}

func (s RandomClusterSelector) GetTarget(spec *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) {
if spec == nil || (spec.TargetID == "" && spec.ExecutionID == nil) {
return nil, fmt.Errorf("invalid executionTargetSpec %v", spec)
func (s RandomClusterSelector) GetTarget(ctx context.Context, spec *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) {
if spec == nil {
return nil, fmt.Errorf("empty executionTargetSpec")
}
if spec.TargetID != "" {
if val, ok := s.executionTargetMap[spec.TargetID]; ok {
return &val, nil
}
return nil, fmt.Errorf("invalid cluster target %s", spec.TargetID)
}
if spec.ExecutionID != nil {
if weightedRandomList, ok := s.domainWeightedRandomMap[spec.ExecutionID.GetDomain()]; ok {
executionName := spec.ExecutionID.GetName()
if executionName != "" {
randSrc, err := getRandSource(executionName)
if err != nil {
return nil, err
}
result, err := weightedRandomList.GetWithSeed(randSrc)
if err != nil {
return nil, err
}
execTarget := result.(executioncluster.ExecutionTarget)
return &execTarget, nil
}
execTarget := weightedRandomList.Get().(executioncluster.ExecutionTarget)
return &execTarget, nil
resource, err := s.resourceManager.GetResource(ctx, managerInterfaces.ResourceRequest{
Project: spec.Project,
Domain: spec.Domain,
Workflow: spec.Workflow,
LaunchPlan: spec.LaunchPlan,
ResourceType: admin.MatchableResource_EXECUTION_CLUSTER_LABEL,
})
if err != nil {
if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
return nil, err
}
return nil, err
}
return nil, fmt.Errorf("invalid executionTargetSpec %v", *spec)
var weightedRandomList random.WeightedRandomList
if resource != nil && resource.Attributes.GetExecutionClusterLabel() != nil {
label := resource.Attributes.GetExecutionClusterLabel().Value

if _, ok := s.labelWeightedRandomMap[label]; ok {
weightedRandomList = s.labelWeightedRandomMap[label]
} else {
logger.Debugf(ctx, "No cluster mapping found for the label %s", label)
}
} else {
logger.Debugf(ctx, "No override found for the spec %v", spec)
}
// If there is no label associated (or) if the label is invalid, choose from all enabled clusters.
// Note that if there is a valid label with zero "Enabled" clusters, we still choose from all enabled ones.
if weightedRandomList == nil {
weightedRandomList = s.equalWeightedAllClusters
}

executionName := spec.ExecutionID
if executionName != "" {
randSrc, err := getRandSource(executionName)
if err != nil {
return nil, err
}
result, err := weightedRandomList.GetWithSeed(randSrc)
if err != nil {
return nil, err
}
execTarget := result.(executioncluster.ExecutionTarget)
return &execTarget, nil
}
execTarget := weightedRandomList.Get().(executioncluster.ExecutionTarget)
return &execTarget, nil
}

func NewRandomClusterSelector(scope promutils.Scope, clusterConfig runtime.ClusterConfiguration, executionTargetProvider interfaces.ExecutionTargetProvider, domainConfig *runtime.DomainsConfig) (interfaces.ClusterInterface, error) {
executionTargetMap, err := getExecutionTargetMap(scope, executionTargetProvider, clusterConfig)
func NewRandomClusterSelector(scope promutils.Scope, clusterConfig runtime.ClusterConfiguration, executionTargetProvider interfaces.ExecutionTargetProvider, db repositories.RepositoryInterface) (interfaces.ClusterInterface, error) {
equalWeightedAllClusters, executionTargetMap, err := getExecutionTargets(context.Background(), scope, executionTargetProvider, clusterConfig)
if err != nil {
return nil, err
}
domainMap := getValidDomainMap(*domainConfig)
domainWeightedRandomMap, err := getDomainWeightedRandomForCluster(context.Background(), scope, executionTargetProvider, clusterConfig, domainMap)
labelWeightedRandomMap, err := getLabeledWeightedRandomForCluster(context.Background(), clusterConfig, executionTargetMap)
if err != nil {
return nil, err
}
return &RandomClusterSelector{
domainWeightedRandomMap: domainWeightedRandomMap,
executionTargetMap: executionTargetMap,
labelWeightedRandomMap: labelWeightedRandomMap,
executionTargetMap: executionTargetMap,
resourceManager: resources.NewResourceManager(db),
equalWeightedAllClusters: equalWeightedAllClusters,
}, nil
}
Loading

0 comments on commit 60b4c87

Please sign in to comment.