Skip to content

Commit

Permalink
feat: seacher return multiple scheduler clusters (#1175)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Mar 22, 2022
1 parent 23ae03f commit f2031d5
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 101 deletions.
13 changes: 6 additions & 7 deletions manager/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,21 +410,20 @@ func (s *Server) ListSchedulers(ctx context.Context, req *manager.ListSchedulers
return nil, status.Error(codes.Unknown, err.Error())
}

// Search optimal scheduler cluster
// Search optimal scheduler clusters
log.Infof("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(schedulerClusters), req.HostInfo)
schedulerCluster, err := s.searcher.FindSchedulerCluster(ctx, schedulerClusters, req)
schedulerClusters, err := s.searcher.FindSchedulerClusters(ctx, schedulerClusters, req)
if err != nil {
log.Errorf("can not matching scheduler cluster %v", err)
return nil, status.Error(codes.NotFound, "scheduler cluster not found")
}
log.Infof("find matching scheduler cluster %v", getSchedulerClusterNames(schedulerClusters))

schedulers := []model.Scheduler{}
if err := s.db.WithContext(ctx).Find(&schedulers, &model.Scheduler{
State: model.SchedulerStateActive,
SchedulerClusterID: schedulerCluster.ID,
}).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
for _, schedulerCluster := range schedulerClusters {
for _, scheduler := range schedulerCluster.Schedulers {
schedulers = append(schedulers, scheduler)
}
}

for _, scheduler := range schedulers {
Expand Down
146 changes: 91 additions & 55 deletions manager/searcher/searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"strings"

"github.com/mitchellh/mapstructure"
Expand All @@ -45,14 +46,17 @@ const (
)

const (
// SecurityDomain affinity weight
securityDomainAffinityWeight float64 = 0.4

// IDC affinity weight
idcAffinityWeight float64 = 0.5
idcAffinityWeight float64 = 0.3

// NetTopology affinity weight
netTopologyAffinityWeight = 0.3
netTopologyAffinityWeight = 0.2

// Location affinity weight
locationAffinityWeight = 0.2
locationAffinityWeight = 0.1
)

const (
Expand All @@ -68,14 +72,16 @@ const (
maxElementLen = 5
)

// Scheduler cluster scopes
type Scopes struct {
IDC string `mapstructure:"idc"`
Location string `mapstructure:"location"`
NetTopology string `mapstructure:"net_topology"`
}

type Searcher interface {
FindSchedulerCluster(context.Context, []model.SchedulerCluster, *manager.ListSchedulersRequest) (model.SchedulerCluster, error)
// FindSchedulerClusters finds scheduler clusters that best matches the evaluation
FindSchedulerClusters(context.Context, []model.SchedulerCluster, *manager.ListSchedulersRequest) ([]model.SchedulerCluster, error)
}

type searcher struct{}
Expand All @@ -91,83 +97,113 @@ func New(pluginDir string) Searcher {
return s
}

func (s *searcher) FindSchedulerCluster(ctx context.Context, schedulerClusters []model.SchedulerCluster, client *manager.ListSchedulersRequest) (model.SchedulerCluster, error) {
// FindSchedulerClusters finds scheduler clusters that best matches the evaluation
func (s *searcher) FindSchedulerClusters(ctx context.Context, schedulerClusters []model.SchedulerCluster, client *manager.ListSchedulersRequest) ([]model.SchedulerCluster, error) {
conditions := client.HostInfo
if len(conditions) <= 0 {
return model.SchedulerCluster{}, errors.New("empty conditions")
return nil, errors.New("empty conditions")
}

if len(schedulerClusters) <= 0 {
return model.SchedulerCluster{}, errors.New("empty scheduler clusters")
return nil, errors.New("empty scheduler clusters")
}

// If there are security domain conditions, match clusters of the same security domain.
// If the security domain condition does not exist, it will match all scheduler security domains.
// Then use clusters sets to score according to scopes.
var clusters []model.SchedulerCluster
securityDomain := conditions[ConditionSecurityDomain]
if securityDomain == "" {
logger.Infof("dfdaemon %s %s have empty security domain", client.HostName, client.Ip)
clusters := FilterSchedulerClusters(conditions, schedulerClusters)
if len(clusters) == 0 {
return nil, fmt.Errorf("security domain %s does not match any scheduler cluster", conditions[ConditionSecurityDomain])
}

for _, schedulerCluster := range schedulerClusters {
if len(schedulerCluster.Schedulers) > 0 {
if securityDomain == "" {
clusters = append(clusters, schedulerCluster)
} else {
for _, securityRule := range schedulerCluster.SecurityGroup.SecurityRules {
if strings.Compare(securityRule.Domain, securityDomain) == 0 {
clusters = append(clusters, schedulerCluster)
}
}
sort.Slice(
clusters,
func(i, j int) bool {
var si, sj Scopes
if err := mapstructure.Decode(clusters[i].Scopes, &si); err != nil {
logger.Errorf("cluster %s decode scopes failed: %v", clusters[i].Name, err)
return false
}
}
}

switch len(clusters) {
case 0:
// If the security domain does not match, there is no cluster available
return model.SchedulerCluster{}, fmt.Errorf("security domain %s does not match", securityDomain)
case 1:
// If only one cluster matches the security domain, return the cluster directly
return clusters[0], nil
default:
// If there are multiple clusters matching the security domain,
// select the schuelder cluster with a higher score
var maxScore float64 = 0
result := clusters[0]
for _, cluster := range clusters {
var scopes Scopes
if err := mapstructure.Decode(cluster.Scopes, &scopes); err != nil {
logger.Infof("cluster %s decode scopes failed: %v", cluster.Name, err)
// Scopes parse failed to skip this evaluation
continue
if err := mapstructure.Decode(clusters[j].Scopes, &sj); err != nil {
logger.Errorf("cluster %s decode scopes failed: %v", clusters[i].Name, err)
return false
}

score := Evaluate(conditions, scopes)
if score > maxScore {
maxScore = score
result = cluster
return Evaluate(conditions, si, clusters[i].SecurityGroup.SecurityRules) > Evaluate(conditions, sj, clusters[j].SecurityGroup.SecurityRules)
},
)

return clusters, nil
}

// Filter the scheduler clusters that dfdaemon can be used
func FilterSchedulerClusters(conditions map[string]string, schedulerClusters []model.SchedulerCluster) []model.SchedulerCluster {
var clusters []model.SchedulerCluster
securityDomain := conditions[ConditionSecurityDomain]
for _, schedulerCluster := range schedulerClusters {
// There are no active schedulers in the scheduler cluster
if len(schedulerCluster.Schedulers) == 0 {
continue
}

// Dfdaemon security_domain does not exist, matching all scheduler clusters
if securityDomain == "" {
clusters = append(clusters, schedulerCluster)
continue
}

// Scheduler cluster is default, matching all dfdaemons
if schedulerCluster.IsDefault {
clusters = append(clusters, schedulerCluster)
continue
}

// Scheduler cluster SecurityRules does not exist, matching all dfdaemons
if len(schedulerCluster.SecurityGroup.SecurityRules) == 0 {
clusters = append(clusters, schedulerCluster)
continue
}

// If security_domain exists for dfdaemon and
// scheduler cluster SecurityRules also exists,
// then security_domain and SecurityRules are equal to match.
for _, securityRule := range schedulerCluster.SecurityGroup.SecurityRules {
if securityRule.Domain == securityDomain {
clusters = append(clusters, schedulerCluster)
}
}
return result, nil
}

return clusters
}

// Evaluate the degree of matching between scheduler cluster and dfdaemon
func Evaluate(conditions map[string]string, scopes Scopes) float64 {
return idcAffinityWeight*calculateIDCAffinityScore(conditions[ConditionIDC], scopes.IDC) +
func Evaluate(conditions map[string]string, scopes Scopes, securityRules []model.SecurityRule) float64 {
return securityDomainAffinityWeight*calculateSecurityDomainAffinityScore(conditions[ConditionSecurityDomain], securityRules) +
idcAffinityWeight*calculateIDCAffinityScore(conditions[ConditionIDC], scopes.IDC) +
locationAffinityWeight*calculateMultiElementAffinityScore(conditions[ConditionLocation], scopes.Location) +
netTopologyAffinityWeight*calculateMultiElementAffinityScore(conditions[ConditionNetTopology], scopes.NetTopology)
}

// calculateSecurityDomainAffinityScore 0.0~1.0 larger and better
func calculateSecurityDomainAffinityScore(securityDomain string, securityRules []model.SecurityRule) float64 {
if securityDomain == "" {
return minScore
}

if len(securityRules) == 0 {
return minScore

}

return maxScore
}

// calculateIDCAffinityScore 0.0~1.0 larger and better
func calculateIDCAffinityScore(dst, src string) float64 {
if dst == "" || src == "" {
return minScore
}

if strings.Compare(dst, src) == 0 {
if dst == src {
return maxScore
}

Expand All @@ -176,7 +212,7 @@ func calculateIDCAffinityScore(dst, src string) float64 {
// it gets the max score of idc.
srcElements := strings.Split(src, "|")
for _, srcElement := range srcElements {
if strings.Compare(dst, srcElement) == 0 {
if dst == srcElement {
return maxScore
}
}
Expand All @@ -190,7 +226,7 @@ func calculateMultiElementAffinityScore(dst, src string) float64 {
return minScore
}

if strings.Compare(dst, src) == 0 {
if dst == src {
return maxScore
}

Expand All @@ -206,7 +242,7 @@ func calculateMultiElementAffinityScore(dst, src string) float64 {
}

for i := 0; i < elementLen; i++ {
if strings.Compare(dstElements[i], srcElements[i]) != 0 {
if dstElements[i] != srcElements[i] {
break
}
score++
Expand Down
Loading

0 comments on commit f2031d5

Please sign in to comment.