Skip to content

Commit

Permalink
Merge pull request flyteorg#56 from lyft/fairness-namespace-cap
Browse files Browse the repository at this point in the history
Fairness in resource manager: adding support for project-level and namespace-level cap
  • Loading branch information
bnsblue authored Mar 3, 2020
2 parents 6ec719b + 7243f33 commit 704b8dd
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 29 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 62 additions & 2 deletions flyteplugins/go/tasks/pluginmachinery/core/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,69 @@ type ResourceRegistrar interface {
RegisterResourceQuota(ctx context.Context, namespace ResourceNamespace, quota int) error
}

// Resource Manager manages a single resource type, and each allocation is of size one
// ResourceManager Interface
// 1. Terms and definitions
// - Resource: resource is an abstraction of anything that has a limited quota of units and can be claimed in a
// single unit or multiple units at once. At Flyte's current state, a resource means a logical
// separation (e.g., a cluster) of an external service that allows a limited number of outstanding
// requests to be sent to.
// - Token: Flyte uses a token to serve as the placeholder to represent a unit of resource. Flyte resource manager
// manages resources by managing the tokens of the resources.
// 2. Description
// ResourceManager provides a task-type-specific pooling system for Flyte Tasks. Plugin writers can optionally
// request for resources in their tasks, in single quantity.
// 3. Usage
// A Flyte plugin registers the resources and the desired quota of each resource with ResourceRegistrar at the
// setup time of Flyte Propeller. At the end of the setup time, Flyte Propeller builds a ResourceManager based on
// these registration requests.
//
// During runtime, the ResourceManager does two simple things: allocating tokens and releasing tokens. When a Flyte
// task execution wants to send a request to an external service, the plugin should claim a unit of the corresponding
// resource. Specifically, an execution needs to generate a unique token, and register the token with ResourceManager
// by calling ResourceManager's AllocateResource() function. ResourceManager will check its current utilization and
// the allocation policy to decide whether or not to grant the request. Only when receiving the "AllocationGranted"
// status shall this execution move forward and send out the request. The granted token will be recorded in a token
// pool corresponding to the resource and managed by ResourceManager. When the request is done, the plugin will ask
// the resource manager to release the token by calling ResourceManager's ReleaseResource(), and the token will be
// erased from the corresponding pool.
// 4. Example
// Flyte has a built-on Qubole plugin that allows Flyte tasks to send out Hive commands to Qubole.
// In the plugin, a single Qubole cluster is a resource, and sending out a single Hive command to a Qubole cluster consumes
// a token of the corresponding resource. The resource allocation is achieved by the Qubole plugin calling
// status, err := AllocateResource(ctx, <cluster name>, <token string>, <constraint spec>)
// and the de-allocation is achieved by the plugin calling
// status, err := AllocateResource(ctx, <cluster name>, <token string>, <constraint spec>)
//
// For example,
// status, err := AllocateResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0", ResourceConstraintsSpec{})
// When the corresponding Hive command finishes, the plugin needs to make the following function call to release
// the corresponding token
// err := ReleaseResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0")
type ResourceManager interface {
GetID() string
AllocateResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) (AllocationStatus, error)
// During execution time, plugins can call AllocateResource() to register a token to the token pool associated with a resource with the resource manager.
// If granted an allocation, the token will be recorded in the corresponding token pool until the same plugin releases it.
// When calling AllocateResource, the plugin needs to specify a ResourceConstraintsSpec which contains resource capping constraints at different levels.
// The ResourceConstraint pointers in ResourceConstraintsSpec, however, can be set to nil to present a non-constraint at that level
AllocateResource(ctx context.Context, namespace ResourceNamespace, allocationToken string, constraintsSpec ResourceConstraintsSpec) (AllocationStatus, error)
// During execution time, after an outstanding request is completed, the plugin need to use ReleaseResource() to release the allocation of the corresponding token
// from the token pool in order to gain back the quota taken by the token
ReleaseResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) error
}


type ResourceConstraint struct {
Value int64
}

// ResourceConstraintsSpec is a contract that a plugin can specify with ResourceManager to force runtime quota-allocation constraints
// at different levels.
//
// Setting constraints in a ResourceConstraintsSpec to nil objects is valid, meaning there's no constraint at the corresponding level.
// For example, a ResourceConstraintsSpec with nil ProjectScopeResourceConstraint and a non-nil NamespaceScopeResourceConstraint means
// that it only poses a cap at the namespace level. A zero-value ResourceConstraintsSpec means there's no constraints posed at any level.
type ResourceConstraintsSpec struct {
ProjectScopeResourceConstraint *ResourceConstraint
NamespaceScopeResourceConstraint *ResourceConstraint
}

20 changes: 11 additions & 9 deletions flyteplugins/go/tasks/plugins/hive/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type ClusterConfig struct {
PrimaryLabel string `json:"primaryLabel" pflag:",The primary label of a given service cluster"`
Labels []string `json:"labels" pflag:",Labels of a given service cluster"`
Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the service cluster"`
ProjectScopeQuotaProportionCap float64 `json:"projectScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a project in the service cluster"`
NamespaceScopeQuotaProportionCap float64 `json:"namespaceScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a namespace in the service cluster"`
}

type DestinationClusterConfig struct {
Expand All @@ -45,7 +47,7 @@ var (
TokenKey: "FLYTE_QUBOLE_CLIENT_TOKEN",
LruCacheSize: 2000,
Workers: 15,
ClusterConfigs: []ClusterConfig{{PrimaryLabel: "default", Labels: []string{"default"}, Limit: 250}},
ClusterConfigs: []ClusterConfig{{PrimaryLabel: "default", Labels: []string{"default"}, Limit: 100, ProjectScopeQuotaProportionCap:0.7, NamespaceScopeQuotaProportionCap:0.7}},
DestinationClusterConfigs: []DestinationClusterConfig{},
}

Expand All @@ -54,14 +56,14 @@ var (

// Qubole plugin configs
type Config struct {
Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"`
CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."`
AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."`
TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."`
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"`
Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"`
ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"`
DestinationClusterConfigs []DestinationClusterConfig `json:"destinationClusterConfigs" pflag:"-,A list configs specifying the destination service cluster for (project, domain)"`
Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"`
CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."`
AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."`
TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."`
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"`
Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"`
ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"`
DestinationClusterConfigs []DestinationClusterConfig `json:"destinationClusterConfigs" pflag:"-,A list configs specifying the destination service cluster for (project, domain)"`
}

// Retrieves the current config value or default.
Expand Down
25 changes: 24 additions & 1 deletion flyteplugins/go/tasks/plugins/hive/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,27 @@ func composeResourceNamespaceWithClusterPrimaryLabel(ctx context.Context, tCtx c
return core.ResourceNamespace(clusterPrimaryLabel), nil
}

func createResourceConstraintsSpec(ctx context.Context, _ core.TaskExecutionContext, targetClusterPrimaryLabel core.ResourceNamespace) core.ResourceConstraintsSpec {
cfg := config.GetQuboleConfig()
constraintsSpec := core.ResourceConstraintsSpec{
ProjectScopeResourceConstraint: nil,
NamespaceScopeResourceConstraint: nil,
}
if cfg.ClusterConfigs == nil {
logger.Infof(ctx, "No cluster config is found. Returning an empty resource constraints spec")
return constraintsSpec
}
for _, cluster := range cfg.ClusterConfigs {
if cluster.PrimaryLabel == string(targetClusterPrimaryLabel) {
constraintsSpec.ProjectScopeResourceConstraint = &core.ResourceConstraint{Value: int64(float64(cluster.Limit) * cluster.ProjectScopeQuotaProportionCap)}
constraintsSpec.NamespaceScopeResourceConstraint = &core.ResourceConstraint{Value: int64(float64(cluster.Limit) * cluster.NamespaceScopeQuotaProportionCap)}
break
}
}
logger.Infof(ctx, "Created a resource constraints spec: [%v]", constraintsSpec)
return constraintsSpec
}

func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, metric QuboleHiveExecutorMetrics) (ExecutionState, error) {
newState := ExecutionState{}
uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
Expand All @@ -162,7 +183,9 @@ func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, cur
return newState, errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when requesting allocation token %s", uniqueId)
}

allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, clusterPrimaryLabel, uniqueId)
resourceConstraintsSpec := createResourceConstraintsSpec(ctx, tCtx, clusterPrimaryLabel)

allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, clusterPrimaryLabel, uniqueId, resourceConstraintsSpec)
if err != nil {
logger.Errorf(ctx, "Resource manager failed for TaskExecId [%s] token [%s]. error %s",
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueId, err)
Expand Down
10 changes: 5 additions & 5 deletions flyteplugins/go/tasks/plugins/hive/execution_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestGetAllocationToken(t *testing.T) {
tCtx := GetMockTaskExecutionContext()
mockResourceManager := tCtx.ResourceManager()
x := mockResourceManager.(*mocks.ResourceManager)
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusGranted, nil)

mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()}
Expand All @@ -190,7 +190,7 @@ func TestGetAllocationToken(t *testing.T) {
tCtx := GetMockTaskExecutionContext()
mockResourceManager := tCtx.ResourceManager()
x := mockResourceManager.(*mocks.ResourceManager)
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusExhausted, nil)

mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()}
Expand All @@ -204,7 +204,7 @@ func TestGetAllocationToken(t *testing.T) {
tCtx := GetMockTaskExecutionContext()
mockResourceManager := tCtx.ResourceManager()
x := mockResourceManager.(*mocks.ResourceManager)
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusNamespaceQuotaExceeded, nil)

mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()}
Expand All @@ -218,7 +218,7 @@ func TestGetAllocationToken(t *testing.T) {
tCtx := GetMockTaskExecutionContext()
mockResourceManager := tCtx.ResourceManager()
x := mockResourceManager.(*mocks.ResourceManager)
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusNamespaceQuotaExceeded, nil)

mockCurrentState := ExecutionState{}
Expand All @@ -232,7 +232,7 @@ func TestGetAllocationToken(t *testing.T) {
tCtx := GetMockTaskExecutionContext()
mockResourceManager := tCtx.ResourceManager()
x := mockResourceManager.(*mocks.ResourceManager)
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusGranted, nil)

startTime := time.Now()
Expand Down
3 changes: 1 addition & 2 deletions flyteplugins/go/tasks/plugins/hive/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package hive

import (
"context"

"github.com/lyft/flytestdlib/cache"

"github.com/lyft/flyteplugins/go/tasks/errors"
Expand Down Expand Up @@ -124,7 +123,7 @@ func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, cfg *co
}

for clusterPrimaryLabel, clusterLimit := range resourceConfig {
logger.Infof(ctx, "Registering resource quota for cluster [%v]", clusterPrimaryLabel)
logger.Infof(ctx, "Registering resource quota ([%v]) and namespace quota cap ([%v]) for cluster [%v]", clusterPrimaryLabel)
if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, core.ResourceNamespace(clusterPrimaryLabel), clusterLimit); err != nil {
logger.Errorf(ctx, "Resource quota registration for [%v] failed due to error [%v]", clusterPrimaryLabel, err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/tests/end_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i
eRecorder.OnRecordRawMatch(mock.Anything, mock.Anything).Return(nil)

resourceManager := &coreMocks.ResourceManager{}
resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginCore.AllocationStatusGranted, nil)
resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(pluginCore.AllocationStatusGranted, nil)
resourceManager.OnReleaseResourceMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)

tCtx := &coreMocks.TaskExecutionContext{}
Expand Down

0 comments on commit 704b8dd

Please sign in to comment.