Skip to content

Commit

Permalink
Merge pull request flyteorg#56 from lyft/fairness-namespace-cap
Browse files Browse the repository at this point in the history
Fairness in resource manager: adding support for project-level and namespace-level cap
  • Loading branch information
bnsblue authored Mar 3, 2020
2 parents 9ac5bea + 53b43c4 commit f3a131a
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 29 deletions.
18 changes: 9 additions & 9 deletions go/tasks/pluginmachinery/core/mocks/resource_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions go/tasks/pluginmachinery/core/mocks/resource_registrar.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 62 additions & 2 deletions go/tasks/pluginmachinery/core/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,69 @@ type ResourceRegistrar interface {
RegisterResourceQuota(ctx context.Context, namespace ResourceNamespace, quota int) error
}

// Resource Manager manages a single resource type, and each allocation is of size one
// ResourceManager Interface
// 1. Terms and definitions
// - Resource: resource is an abstraction of anything that has a limited quota of units and can be claimed in a
// single unit or multiple units at once. At Flyte's current state, a resource means a logical
// separation (e.g., a cluster) of an external service that allows a limited number of outstanding
// requests to be sent to.
// - Token: Flyte uses a token to serve as the placeholder to represent a unit of resource. Flyte resource manager
// manages resources by managing the tokens of the resources.
// 2. Description
// ResourceManager provides a task-type-specific pooling system for Flyte Tasks. Plugin writers can optionally
// request for resources in their tasks, in single quantity.
// 3. Usage
// A Flyte plugin registers the resources and the desired quota of each resource with ResourceRegistrar at the
// setup time of Flyte Propeller. At the end of the setup time, Flyte Propeller builds a ResourceManager based on
// these registration requests.
//
// During runtime, the ResourceManager does two simple things: allocating tokens and releasing tokens. When a Flyte
// task execution wants to send a request to an external service, the plugin should claim a unit of the corresponding
// resource. Specifically, an execution needs to generate a unique token, and register the token with ResourceManager
// by calling ResourceManager's AllocateResource() function. ResourceManager will check its current utilization and
// the allocation policy to decide whether or not to grant the request. Only when receiving the "AllocationGranted"
// status shall this execution move forward and send out the request. The granted token will be recorded in a token
// pool corresponding to the resource and managed by ResourceManager. When the request is done, the plugin will ask
// the resource manager to release the token by calling ResourceManager's ReleaseResource(), and the token will be
// erased from the corresponding pool.
// 4. Example
// Flyte has a built-on Qubole plugin that allows Flyte tasks to send out Hive commands to Qubole.
// In the plugin, a single Qubole cluster is a resource, and sending out a single Hive command to a Qubole cluster consumes
// a token of the corresponding resource. The resource allocation is achieved by the Qubole plugin calling
// status, err := AllocateResource(ctx, <cluster name>, <token string>, <constraint spec>)
// and the de-allocation is achieved by the plugin calling
// status, err := AllocateResource(ctx, <cluster name>, <token string>, <constraint spec>)
//
// For example,
// status, err := AllocateResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0", ResourceConstraintsSpec{})
// When the corresponding Hive command finishes, the plugin needs to make the following function call to release
// the corresponding token
// err := ReleaseResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0")
type ResourceManager interface {
GetID() string
AllocateResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) (AllocationStatus, error)
// During execution time, plugins can call AllocateResource() to register a token to the token pool associated with a resource with the resource manager.
// If granted an allocation, the token will be recorded in the corresponding token pool until the same plugin releases it.
// When calling AllocateResource, the plugin needs to specify a ResourceConstraintsSpec which contains resource capping constraints at different levels.
// The ResourceConstraint pointers in ResourceConstraintsSpec, however, can be set to nil to present a non-constraint at that level
AllocateResource(ctx context.Context, namespace ResourceNamespace, allocationToken string, constraintsSpec ResourceConstraintsSpec) (AllocationStatus, error)
// During execution time, after an outstanding request is completed, the plugin need to use ReleaseResource() to release the allocation of the corresponding token
// from the token pool in order to gain back the quota taken by the token
ReleaseResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) error
}


type ResourceConstraint struct {
Value int64
}

// ResourceConstraintsSpec is a contract that a plugin can specify with ResourceManager to force runtime quota-allocation constraints
// at different levels.
//
// Setting constraints in a ResourceConstraintsSpec to nil objects is valid, meaning there's no constraint at the corresponding level.
// For example, a ResourceConstraintsSpec with nil ProjectScopeResourceConstraint and a non-nil NamespaceScopeResourceConstraint means
// that it only poses a cap at the namespace level. A zero-value ResourceConstraintsSpec means there's no constraints posed at any level.
type ResourceConstraintsSpec struct {
ProjectScopeResourceConstraint *ResourceConstraint
NamespaceScopeResourceConstraint *ResourceConstraint
}

20 changes: 11 additions & 9 deletions go/tasks/plugins/hive/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type ClusterConfig struct {
PrimaryLabel string `json:"primaryLabel" pflag:",The primary label of a given service cluster"`
Labels []string `json:"labels" pflag:",Labels of a given service cluster"`
Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the service cluster"`
ProjectScopeQuotaProportionCap float64 `json:"projectScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a project in the service cluster"`
NamespaceScopeQuotaProportionCap float64 `json:"namespaceScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a namespace in the service cluster"`
}

type DestinationClusterConfig struct {
Expand All @@ -45,7 +47,7 @@ var (
TokenKey: "FLYTE_QUBOLE_CLIENT_TOKEN",
LruCacheSize: 2000,
Workers: 15,
ClusterConfigs: []ClusterConfig{{PrimaryLabel: "default", Labels: []string{"default"}, Limit: 250}},
ClusterConfigs: []ClusterConfig{{PrimaryLabel: "default", Labels: []string{"default"}, Limit: 100, ProjectScopeQuotaProportionCap:0.7, NamespaceScopeQuotaProportionCap:0.7}},
DestinationClusterConfigs: []DestinationClusterConfig{},
}

Expand All @@ -54,14 +56,14 @@ var (

// Qubole plugin configs
type Config struct {
Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"`
CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."`
AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."`
TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."`
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"`
Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"`
ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"`
DestinationClusterConfigs []DestinationClusterConfig `json:"destinationClusterConfigs" pflag:"-,A list configs specifying the destination service cluster for (project, domain)"`
Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"`
CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."`
AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."`
TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."`
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"`
Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"`
ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"`
DestinationClusterConfigs []DestinationClusterConfig `json:"destinationClusterConfigs" pflag:"-,A list configs specifying the destination service cluster for (project, domain)"`
}

// Retrieves the current config value or default.
Expand Down
25 changes: 24 additions & 1 deletion go/tasks/plugins/hive/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,27 @@ func composeResourceNamespaceWithClusterPrimaryLabel(ctx context.Context, tCtx c
return core.ResourceNamespace(clusterPrimaryLabel), nil
}

func createResourceConstraintsSpec(ctx context.Context, _ core.TaskExecutionContext, targetClusterPrimaryLabel core.ResourceNamespace) core.ResourceConstraintsSpec {
cfg := config.GetQuboleConfig()
constraintsSpec := core.ResourceConstraintsSpec{
ProjectScopeResourceConstraint: nil,
NamespaceScopeResourceConstraint: nil,
}
if cfg.ClusterConfigs == nil {
logger.Infof(ctx, "No cluster config is found. Returning an empty resource constraints spec")
return constraintsSpec
}
for _, cluster := range cfg.ClusterConfigs {
if cluster.PrimaryLabel == string(targetClusterPrimaryLabel) {
constraintsSpec.ProjectScopeResourceConstraint = &core.ResourceConstraint{Value: int64(float64(cluster.Limit) * cluster.ProjectScopeQuotaProportionCap)}
constraintsSpec.NamespaceScopeResourceConstraint = &core.ResourceConstraint{Value: int64(float64(cluster.Limit) * cluster.NamespaceScopeQuotaProportionCap)}
break
}
}
logger.Infof(ctx, "Created a resource constraints spec: [%v]", constraintsSpec)
return constraintsSpec
}

func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, metric QuboleHiveExecutorMetrics) (ExecutionState, error) {
newState := ExecutionState{}
uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
Expand All @@ -162,7 +183,9 @@ func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, cur
return newState, errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when requesting allocation token %s", uniqueId)
}

allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, clusterPrimaryLabel, uniqueId)
resourceConstraintsSpec := createResourceConstraintsSpec(ctx, tCtx, clusterPrimaryLabel)

allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, clusterPrimaryLabel, uniqueId, resourceConstraintsSpec)
if err != nil {
logger.Errorf(ctx, "Resource manager failed for TaskExecId [%s] token [%s]. error %s",
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueId, err)
Expand Down
10 changes: 5 additions & 5 deletions go/tasks/plugins/hive/execution_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestGetAllocationToken(t *testing.T) {
tCtx := GetMockTaskExecutionContext()
mockResourceManager := tCtx.ResourceManager()
x := mockResourceManager.(*mocks.ResourceManager)
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusGranted, nil)

mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()}
Expand All @@ -190,7 +190,7 @@ func TestGetAllocationToken(t *testing.T) {
tCtx := GetMockTaskExecutionContext()
mockResourceManager := tCtx.ResourceManager()
x := mockResourceManager.(*mocks.ResourceManager)
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusExhausted, nil)

mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()}
Expand All @@ -204,7 +204,7 @@ func TestGetAllocationToken(t *testing.T) {
tCtx := GetMockTaskExecutionContext()
mockResourceManager := tCtx.ResourceManager()
x := mockResourceManager.(*mocks.ResourceManager)
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusNamespaceQuotaExceeded, nil)

mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()}
Expand All @@ -218,7 +218,7 @@ func TestGetAllocationToken(t *testing.T) {
tCtx := GetMockTaskExecutionContext()
mockResourceManager := tCtx.ResourceManager()
x := mockResourceManager.(*mocks.ResourceManager)
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusNamespaceQuotaExceeded, nil)

mockCurrentState := ExecutionState{}
Expand All @@ -232,7 +232,7 @@ func TestGetAllocationToken(t *testing.T) {
tCtx := GetMockTaskExecutionContext()
mockResourceManager := tCtx.ResourceManager()
x := mockResourceManager.(*mocks.ResourceManager)
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusGranted, nil)

startTime := time.Now()
Expand Down
3 changes: 1 addition & 2 deletions go/tasks/plugins/hive/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package hive

import (
"context"

"github.com/lyft/flytestdlib/cache"

"github.com/lyft/flyteplugins/go/tasks/errors"
Expand Down Expand Up @@ -124,7 +123,7 @@ func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, cfg *co
}

for clusterPrimaryLabel, clusterLimit := range resourceConfig {
logger.Infof(ctx, "Registering resource quota for cluster [%v]", clusterPrimaryLabel)
logger.Infof(ctx, "Registering resource quota ([%v]) and namespace quota cap ([%v]) for cluster [%v]", clusterPrimaryLabel)
if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, core.ResourceNamespace(clusterPrimaryLabel), clusterLimit); err != nil {
logger.Errorf(ctx, "Resource quota registration for [%v] failed due to error [%v]", clusterPrimaryLabel, err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion tests/end_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i
eRecorder.OnRecordRawMatch(mock.Anything, mock.Anything).Return(nil)

resourceManager := &coreMocks.ResourceManager{}
resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginCore.AllocationStatusGranted, nil)
resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(pluginCore.AllocationStatusGranted, nil)
resourceManager.OnReleaseResourceMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)

tCtx := &coreMocks.TaskExecutionContext{}
Expand Down

0 comments on commit f3a131a

Please sign in to comment.