From 90846cbde0d253abb0a341ba2daa1cfe3da413de Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Tue, 18 Feb 2020 10:19:44 -0800 Subject: [PATCH 1/9] Adding per namespace quota proportion cap to config. Adding a registration function to resource registrar --- .../pluginmachinery/core/resource_manager.go | 1 + go/tasks/plugins/hive/config/config.go | 17 +++++++++-------- go/tasks/plugins/hive/executor.go | 5 +++-- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/go/tasks/pluginmachinery/core/resource_manager.go b/go/tasks/pluginmachinery/core/resource_manager.go index ed8b233b4..039535b66 100644 --- a/go/tasks/pluginmachinery/core/resource_manager.go +++ b/go/tasks/pluginmachinery/core/resource_manager.go @@ -31,6 +31,7 @@ func (r ResourceNamespace) CreateSubNamespace(namespace ResourceNamespace) Resou type ResourceRegistrar interface { RegisterResourceQuota(ctx context.Context, namespace ResourceNamespace, quota int) error + RegisterResourceNamespaceQuotaProportionCap(ctx context.Context, proportionCap float64) error } // Resource Manager manages a single resource type, and each allocation is of size one diff --git a/go/tasks/plugins/hive/config/config.go b/go/tasks/plugins/hive/config/config.go index d1dd51cd1..aeba229a8 100644 --- a/go/tasks/plugins/hive/config/config.go +++ b/go/tasks/plugins/hive/config/config.go @@ -54,14 +54,15 @@ var ( // Qubole plugin configs type Config struct { - Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` - CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` - AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` - TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` - LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` - Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` - ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` - DestinationClusterConfigs []DestinationClusterConfig `json:"destinationClusterConfigs" pflag:"-,A list configs specifying the destination service cluster for (project, domain)"` + Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` + CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` + AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` + TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` + LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` + Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` + ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` + DestinationClusterConfigs []DestinationClusterConfig `json:"destinationClusterConfigs" pflag:"-,A list configs specifying the destination service cluster for (project, domain)"` + NamespaceQuotaProportionCap float64 `json:"namespaceQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the hard cap on the maximum proportion of quotas a namespace can claim"` } // Retrieves the current config value or default. diff --git a/go/tasks/plugins/hive/executor.go b/go/tasks/plugins/hive/executor.go index a00398326..2cc12682c 100644 --- a/go/tasks/plugins/hive/executor.go +++ b/go/tasks/plugins/hive/executor.go @@ -2,7 +2,6 @@ package hive import ( "context" - "github.com/lyft/flytestdlib/cache" "github.com/lyft/flyteplugins/go/tasks/errors" @@ -124,13 +123,15 @@ func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, cfg *co } for clusterPrimaryLabel, clusterLimit := range resourceConfig { - logger.Infof(ctx, "Registering resource quota for cluster [%v]", clusterPrimaryLabel) + logger.Infof(ctx, "Registering resource quota ([%v]) and namespace quota cap ([%v]) for cluster [%v]", clusterPrimaryLabel) if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, core.ResourceNamespace(clusterPrimaryLabel), clusterLimit); err != nil { logger.Errorf(ctx, "Resource quota registration for [%v] failed due to error [%v]", clusterPrimaryLabel, err) return nil, err } } + iCtx.ResourceRegistrar().RegisterResourceNamespaceQuotaProportionCap(ctx, cfg.NamespaceQuotaProportionCap) + return q, nil } From 1ce137d446bad5c1c8002d682c92e72d7015884c Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Tue, 18 Feb 2020 12:04:51 -0800 Subject: [PATCH 2/9] lint --- go/tasks/pluginmachinery/core/resource_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/tasks/pluginmachinery/core/resource_manager.go b/go/tasks/pluginmachinery/core/resource_manager.go index 039535b66..27214ff0e 100644 --- a/go/tasks/pluginmachinery/core/resource_manager.go +++ b/go/tasks/pluginmachinery/core/resource_manager.go @@ -31,7 +31,7 @@ func (r ResourceNamespace) CreateSubNamespace(namespace ResourceNamespace) Resou type ResourceRegistrar interface { RegisterResourceQuota(ctx context.Context, namespace ResourceNamespace, quota int) error - RegisterResourceNamespaceQuotaProportionCap(ctx context.Context, proportionCap float64) error + RegisterResourceNamespaceQuotaProportionCap(ctx context.Context, proportionCap float64) } // Resource Manager manages a single resource type, and each allocation is of size one From a9ae2a5d61bcead4effa4b5a142540544c7ef955 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Tue, 25 Feb 2020 11:33:05 -0800 Subject: [PATCH 3/9] adding resource constraints --- go/tasks/pluginmachinery/core/resource_manager.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/go/tasks/pluginmachinery/core/resource_manager.go b/go/tasks/pluginmachinery/core/resource_manager.go index 27214ff0e..8a7340913 100644 --- a/go/tasks/pluginmachinery/core/resource_manager.go +++ b/go/tasks/pluginmachinery/core/resource_manager.go @@ -40,3 +40,14 @@ type ResourceManager interface { AllocateResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) (AllocationStatus, error) ReleaseResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) error } + + +type ResourceConstraint struct { + Value int64 +} + +type ResourceConstraintsSpec struct { + ProjectScopeResourceConstraint ResourceConstraint + NamespaceScopeResourceConstraint ResourceConstraint +} + From 477dac853f5f099601ff6cca854440f5b3d4fc64 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Tue, 25 Feb 2020 11:53:04 -0800 Subject: [PATCH 4/9] change fields from values to pointers --- go/tasks/pluginmachinery/core/resource_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/tasks/pluginmachinery/core/resource_manager.go b/go/tasks/pluginmachinery/core/resource_manager.go index 8a7340913..693736a70 100644 --- a/go/tasks/pluginmachinery/core/resource_manager.go +++ b/go/tasks/pluginmachinery/core/resource_manager.go @@ -47,7 +47,7 @@ type ResourceConstraint struct { } type ResourceConstraintsSpec struct { - ProjectScopeResourceConstraint ResourceConstraint - NamespaceScopeResourceConstraint ResourceConstraint + ProjectScopeResourceConstraint *ResourceConstraint + NamespaceScopeResourceConstraint *ResourceConstraint } From 251b2b226ee30db2d15520a2a7294fa40a32ca9e Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Tue, 25 Feb 2020 13:04:36 -0800 Subject: [PATCH 5/9] refactor the signature of AllocateResource to add contraints --- go/tasks/pluginmachinery/core/resource_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/tasks/pluginmachinery/core/resource_manager.go b/go/tasks/pluginmachinery/core/resource_manager.go index 693736a70..aaf139bff 100644 --- a/go/tasks/pluginmachinery/core/resource_manager.go +++ b/go/tasks/pluginmachinery/core/resource_manager.go @@ -37,7 +37,7 @@ type ResourceRegistrar interface { // Resource Manager manages a single resource type, and each allocation is of size one type ResourceManager interface { GetID() string - AllocateResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) (AllocationStatus, error) + AllocateResource(ctx context.Context, namespace ResourceNamespace, allocationToken string, constraintsSpec ResourceConstraintsSpec) (AllocationStatus, error) ReleaseResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) error } From 0b30b77034abebbf427cb4fd900dafe7c2533a35 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Tue, 25 Feb 2020 14:11:27 -0800 Subject: [PATCH 6/9] AllocateResource signature change --- .../core/mocks/resource_manager.go | 18 +++++++++--------- .../core/mocks/resource_registrar.go | 5 +++++ go/tasks/plugins/hive/execution_state.go | 2 +- go/tasks/plugins/hive/execution_state_test.go | 10 +++++----- tests/end_to_end.go | 2 +- 5 files changed, 21 insertions(+), 16 deletions(-) diff --git a/go/tasks/pluginmachinery/core/mocks/resource_manager.go b/go/tasks/pluginmachinery/core/mocks/resource_manager.go index 7478e1399..c28036766 100644 --- a/go/tasks/pluginmachinery/core/mocks/resource_manager.go +++ b/go/tasks/pluginmachinery/core/mocks/resource_manager.go @@ -22,8 +22,8 @@ func (_m ResourceManager_AllocateResource) Return(_a0 core.AllocationStatus, _a1 return &ResourceManager_AllocateResource{Call: _m.Call.Return(_a0, _a1)} } -func (_m *ResourceManager) OnAllocateResource(ctx context.Context, namespace core.ResourceNamespace, allocationToken string) *ResourceManager_AllocateResource { - c := _m.On("AllocateResource", ctx, namespace, allocationToken) +func (_m *ResourceManager) OnAllocateResource(ctx context.Context, namespace core.ResourceNamespace, allocationToken string, constraintsSpec core.ResourceConstraintsSpec) *ResourceManager_AllocateResource { + c := _m.On("AllocateResource", ctx, namespace, allocationToken, constraintsSpec) return &ResourceManager_AllocateResource{Call: c} } @@ -32,20 +32,20 @@ func (_m *ResourceManager) OnAllocateResourceMatch(matchers ...interface{}) *Res return &ResourceManager_AllocateResource{Call: c} } -// AllocateResource provides a mock function with given fields: ctx, namespace, allocationToken -func (_m *ResourceManager) AllocateResource(ctx context.Context, namespace core.ResourceNamespace, allocationToken string) (core.AllocationStatus, error) { - ret := _m.Called(ctx, namespace, allocationToken) +// AllocateResource provides a mock function with given fields: ctx, namespace, allocationToken, constraintsSpec +func (_m *ResourceManager) AllocateResource(ctx context.Context, namespace core.ResourceNamespace, allocationToken string, constraintsSpec core.ResourceConstraintsSpec) (core.AllocationStatus, error) { + ret := _m.Called(ctx, namespace, allocationToken, constraintsSpec) var r0 core.AllocationStatus - if rf, ok := ret.Get(0).(func(context.Context, core.ResourceNamespace, string) core.AllocationStatus); ok { - r0 = rf(ctx, namespace, allocationToken) + if rf, ok := ret.Get(0).(func(context.Context, core.ResourceNamespace, string, core.ResourceConstraintsSpec) core.AllocationStatus); ok { + r0 = rf(ctx, namespace, allocationToken, constraintsSpec) } else { r0 = ret.Get(0).(core.AllocationStatus) } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, core.ResourceNamespace, string) error); ok { - r1 = rf(ctx, namespace, allocationToken) + if rf, ok := ret.Get(1).(func(context.Context, core.ResourceNamespace, string, core.ResourceConstraintsSpec) error); ok { + r1 = rf(ctx, namespace, allocationToken, constraintsSpec) } else { r1 = ret.Error(1) } diff --git a/go/tasks/pluginmachinery/core/mocks/resource_registrar.go b/go/tasks/pluginmachinery/core/mocks/resource_registrar.go index c2707c49d..fdfaf11a9 100644 --- a/go/tasks/pluginmachinery/core/mocks/resource_registrar.go +++ b/go/tasks/pluginmachinery/core/mocks/resource_registrar.go @@ -14,6 +14,11 @@ type ResourceRegistrar struct { mock.Mock } +// RegisterResourceNamespaceQuotaProportionCap provides a mock function with given fields: ctx, proportionCap +func (_m *ResourceRegistrar) RegisterResourceNamespaceQuotaProportionCap(ctx context.Context, proportionCap float64) { + _m.Called(ctx, proportionCap) +} + type ResourceRegistrar_RegisterResourceQuota struct { *mock.Call } diff --git a/go/tasks/plugins/hive/execution_state.go b/go/tasks/plugins/hive/execution_state.go index 4246d1293..5be5ae043 100644 --- a/go/tasks/plugins/hive/execution_state.go +++ b/go/tasks/plugins/hive/execution_state.go @@ -162,7 +162,7 @@ func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, cur return newState, errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when requesting allocation token %s", uniqueId) } - allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, clusterPrimaryLabel, uniqueId) + allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, clusterPrimaryLabel, uniqueId, core.ResourceConstraintsSpec{}) // TODO: replace it with proper values from the config if err != nil { logger.Errorf(ctx, "Resource manager failed for TaskExecId [%s] token [%s]. error %s", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueId, err) diff --git a/go/tasks/plugins/hive/execution_state_test.go b/go/tasks/plugins/hive/execution_state_test.go index eadd7f951..c90670abf 100644 --- a/go/tasks/plugins/hive/execution_state_test.go +++ b/go/tasks/plugins/hive/execution_state_test.go @@ -176,7 +176,7 @@ func TestGetAllocationToken(t *testing.T) { tCtx := GetMockTaskExecutionContext() mockResourceManager := tCtx.ResourceManager() x := mockResourceManager.(*mocks.ResourceManager) - x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). + x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusGranted, nil) mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()} @@ -190,7 +190,7 @@ func TestGetAllocationToken(t *testing.T) { tCtx := GetMockTaskExecutionContext() mockResourceManager := tCtx.ResourceManager() x := mockResourceManager.(*mocks.ResourceManager) - x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). + x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusExhausted, nil) mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()} @@ -204,7 +204,7 @@ func TestGetAllocationToken(t *testing.T) { tCtx := GetMockTaskExecutionContext() mockResourceManager := tCtx.ResourceManager() x := mockResourceManager.(*mocks.ResourceManager) - x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). + x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusNamespaceQuotaExceeded, nil) mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()} @@ -218,7 +218,7 @@ func TestGetAllocationToken(t *testing.T) { tCtx := GetMockTaskExecutionContext() mockResourceManager := tCtx.ResourceManager() x := mockResourceManager.(*mocks.ResourceManager) - x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). + x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusNamespaceQuotaExceeded, nil) mockCurrentState := ExecutionState{} @@ -232,7 +232,7 @@ func TestGetAllocationToken(t *testing.T) { tCtx := GetMockTaskExecutionContext() mockResourceManager := tCtx.ResourceManager() x := mockResourceManager.(*mocks.ResourceManager) - x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). + x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusGranted, nil) startTime := time.Now() diff --git a/tests/end_to_end.go b/tests/end_to_end.go index 6c11a12e5..cd9c64edf 100644 --- a/tests/end_to_end.go +++ b/tests/end_to_end.go @@ -217,7 +217,7 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i eRecorder.OnRecordRawMatch(mock.Anything, mock.Anything).Return(nil) resourceManager := &coreMocks.ResourceManager{} - resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginCore.AllocationStatusGranted, nil) + resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(pluginCore.AllocationStatusGranted, nil) resourceManager.OnReleaseResourceMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) tCtx := &coreMocks.TaskExecutionContext{} From 55326217c3b61b7c60d6c2870451696bded6ed71 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Wed, 26 Feb 2020 09:12:35 -0800 Subject: [PATCH 7/9] refactored hive plugin to pass in resource constraints --- .../pluginmachinery/core/resource_manager.go | 1 - go/tasks/plugins/hive/config/config.go | 5 ++-- go/tasks/plugins/hive/execution_state.go | 25 ++++++++++++++++++- go/tasks/plugins/hive/executor.go | 2 -- 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/go/tasks/pluginmachinery/core/resource_manager.go b/go/tasks/pluginmachinery/core/resource_manager.go index aaf139bff..fe6b3daa5 100644 --- a/go/tasks/pluginmachinery/core/resource_manager.go +++ b/go/tasks/pluginmachinery/core/resource_manager.go @@ -31,7 +31,6 @@ func (r ResourceNamespace) CreateSubNamespace(namespace ResourceNamespace) Resou type ResourceRegistrar interface { RegisterResourceQuota(ctx context.Context, namespace ResourceNamespace, quota int) error - RegisterResourceNamespaceQuotaProportionCap(ctx context.Context, proportionCap float64) } // Resource Manager manages a single resource type, and each allocation is of size one diff --git a/go/tasks/plugins/hive/config/config.go b/go/tasks/plugins/hive/config/config.go index aeba229a8..490447ac5 100644 --- a/go/tasks/plugins/hive/config/config.go +++ b/go/tasks/plugins/hive/config/config.go @@ -29,6 +29,8 @@ type ClusterConfig struct { PrimaryLabel string `json:"primaryLabel" pflag:",The primary label of a given service cluster"` Labels []string `json:"labels" pflag:",Labels of a given service cluster"` Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the service cluster"` + ProjectScopeQuotaProportionCap float64 `json:"projectScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a project in the service cluster"` + NamespaceScopeQuotaProportionCap float64 `json:"namespaceScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a namespace in the service cluster"` } type DestinationClusterConfig struct { @@ -45,7 +47,7 @@ var ( TokenKey: "FLYTE_QUBOLE_CLIENT_TOKEN", LruCacheSize: 2000, Workers: 15, - ClusterConfigs: []ClusterConfig{{PrimaryLabel: "default", Labels: []string{"default"}, Limit: 250}}, + ClusterConfigs: []ClusterConfig{{PrimaryLabel: "default", Labels: []string{"default"}, Limit: 100, ProjectScopeQuotaProportionCap:0.7, NamespaceScopeQuotaProportionCap:0.7}}, DestinationClusterConfigs: []DestinationClusterConfig{}, } @@ -62,7 +64,6 @@ type Config struct { Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` DestinationClusterConfigs []DestinationClusterConfig `json:"destinationClusterConfigs" pflag:"-,A list configs specifying the destination service cluster for (project, domain)"` - NamespaceQuotaProportionCap float64 `json:"namespaceQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the hard cap on the maximum proportion of quotas a namespace can claim"` } // Retrieves the current config value or default. diff --git a/go/tasks/plugins/hive/execution_state.go b/go/tasks/plugins/hive/execution_state.go index 5be5ae043..f8abaf395 100644 --- a/go/tasks/plugins/hive/execution_state.go +++ b/go/tasks/plugins/hive/execution_state.go @@ -153,6 +153,27 @@ func composeResourceNamespaceWithClusterPrimaryLabel(ctx context.Context, tCtx c return core.ResourceNamespace(clusterPrimaryLabel), nil } +func createResourceConstraintsSpec(ctx context.Context, _ core.TaskExecutionContext, targetClusterPrimaryLabel core.ResourceNamespace) core.ResourceConstraintsSpec { + cfg := config.GetQuboleConfig() + constraintsSpec := core.ResourceConstraintsSpec{ + ProjectScopeResourceConstraint: nil, + NamespaceScopeResourceConstraint: nil, + } + if cfg.ClusterConfigs == nil { + logger.Infof(ctx, "No cluster config is found. Returning an empty resource constraints spec") + return constraintsSpec + } + for _, cluster := range cfg.ClusterConfigs { + if cluster.PrimaryLabel == string(targetClusterPrimaryLabel) { + constraintsSpec.ProjectScopeResourceConstraint = &core.ResourceConstraint{Value: int64(float64(cluster.Limit) * cluster.ProjectScopeQuotaProportionCap)} + constraintsSpec.NamespaceScopeResourceConstraint = &core.ResourceConstraint{Value: int64(float64(cluster.Limit) * cluster.NamespaceScopeQuotaProportionCap)} + break + } + } + logger.Infof(ctx, "Created a resource constraints spec: [%v]", constraintsSpec) + return constraintsSpec +} + func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, metric QuboleHiveExecutorMetrics) (ExecutionState, error) { newState := ExecutionState{} uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() @@ -162,7 +183,9 @@ func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, cur return newState, errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when requesting allocation token %s", uniqueId) } - allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, clusterPrimaryLabel, uniqueId, core.ResourceConstraintsSpec{}) // TODO: replace it with proper values from the config + resourceConstraintsSpec := createResourceConstraintsSpec(ctx, tCtx, clusterPrimaryLabel) + + allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, clusterPrimaryLabel, uniqueId, resourceConstraintsSpec) if err != nil { logger.Errorf(ctx, "Resource manager failed for TaskExecId [%s] token [%s]. error %s", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueId, err) diff --git a/go/tasks/plugins/hive/executor.go b/go/tasks/plugins/hive/executor.go index 2cc12682c..3ee576265 100644 --- a/go/tasks/plugins/hive/executor.go +++ b/go/tasks/plugins/hive/executor.go @@ -130,8 +130,6 @@ func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, cfg *co } } - iCtx.ResourceRegistrar().RegisterResourceNamespaceQuotaProportionCap(ctx, cfg.NamespaceQuotaProportionCap) - return q, nil } From 286463633b8a2974d4c6cc2012e918f6b1df7693 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Tue, 3 Mar 2020 11:04:29 -0800 Subject: [PATCH 8/9] add Doc in comments --- .../pluginmachinery/core/resource_manager.go | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/go/tasks/pluginmachinery/core/resource_manager.go b/go/tasks/pluginmachinery/core/resource_manager.go index fe6b3daa5..0fda675ac 100644 --- a/go/tasks/pluginmachinery/core/resource_manager.go +++ b/go/tasks/pluginmachinery/core/resource_manager.go @@ -33,10 +33,30 @@ type ResourceRegistrar interface { RegisterResourceQuota(ctx context.Context, namespace ResourceNamespace, quota int) error } -// Resource Manager manages a single resource type, and each allocation is of size one +// Resource Manager manages type, and each allocation is of size one +// Flyte resource manager manages resources that Flyte components use. Generally speaking, a resource is an abstraction of anything that has a limited quota +// of units and can be claimed in a single unit or multiple units at once. At Flyte's current state, a resource means a logical separation (e.g., a cluster) +// of an external service that allows a limited number of outstanding requests to be sent to. Flyte uses a token to serve as the placeholder to represent a +// unit of resource. Flyte resource manager manages resources by managing the tokens of the resources. For example, Flyte has a Qubole plugin that allows Flyte +// tasks to send out Hive commands to Qubole. Here, a single Qubole cluster is a resource, and sending out a single Hive command to a Qubole cluster consumes a +// token of the corresponding resource. +// +// At the setup time of Flyte Propeller, a Flyte plugin needs to register the resources and the desired quota of each resource with Flyte resource manager. During +// runtime, Flyte resource manager does two simple things: allocating tokens and releasing tokens. When a Flyte task execution wants to send a request to an external +// service, first it needs to claim a unit of the corresponding resource. Specifically, an execution needs to generate a unique token, and use the token to call the +// resource manager's AllocateResource() function. Flyte resource manager will check its current utilization and the allocation policy to decide whether or not to grant +// the request. Only when receiving the "Granted" return value shall this execution move forward and send out the request. The granted token will be recorded in a token pool +// corresponding to the resource and managed by the resource manager. When the request is done, the plugin will ask the resource manager to release the token, and the token +// will be erased from the corresponding pool. type ResourceManager interface { GetID() string + // During execution time, plugins can call AllocateResource() to register a token to the token pool associated with a resource with the resource manager. + // If granted an allocation, the token will be recorded in the corresponding token pool until the same plugin releases it. + // When calling AllocateResource, the plugin needs to specify a ResourceConstraintsSpec which contains resource capping constraints at different levels. + // The ResourceConstraint pointers in ResourceConstraintsSpec, however, can be set to nil to present a non-constraint at that level AllocateResource(ctx context.Context, namespace ResourceNamespace, allocationToken string, constraintsSpec ResourceConstraintsSpec) (AllocationStatus, error) + // During execution time, after an outstanding request is completed, the plugin need to use ReleaseResource() to release the allocation of the corresponding token + // from the token pool in order to gain back the quota taken by the token ReleaseResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) error } @@ -45,6 +65,10 @@ type ResourceConstraint struct { Value int64 } +// ResourceConstraintsSpec is a contract that a plugin can specify, to force quota-acquisition constraints at different levels. +// Setting constraints in a ResourceConstraintsSpec to nil objects is valid, meaning there's no constraint at the corresponding level. +// For example, a ResourceConstraintsSpec with nil ProjectScopeResourceConstraint and a non-nil NamespaceScopeResourceConstraint means +// that it only poses a cap at the namespace level. A zero-value ResourceConstraintsSpec means there's no constraints posed at any level. type ResourceConstraintsSpec struct { ProjectScopeResourceConstraint *ResourceConstraint NamespaceScopeResourceConstraint *ResourceConstraint From 53b43c445d9373cf039dba1658546c142a2dd4ef Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Tue, 3 Mar 2020 11:59:22 -0800 Subject: [PATCH 9/9] cleaning up doc of ResourceManager --- .../pluginmachinery/core/resource_manager.go | 55 ++++++++++++++----- 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/go/tasks/pluginmachinery/core/resource_manager.go b/go/tasks/pluginmachinery/core/resource_manager.go index 0fda675ac..0b60c6198 100644 --- a/go/tasks/pluginmachinery/core/resource_manager.go +++ b/go/tasks/pluginmachinery/core/resource_manager.go @@ -33,21 +33,44 @@ type ResourceRegistrar interface { RegisterResourceQuota(ctx context.Context, namespace ResourceNamespace, quota int) error } -// Resource Manager manages type, and each allocation is of size one -// Flyte resource manager manages resources that Flyte components use. Generally speaking, a resource is an abstraction of anything that has a limited quota -// of units and can be claimed in a single unit or multiple units at once. At Flyte's current state, a resource means a logical separation (e.g., a cluster) -// of an external service that allows a limited number of outstanding requests to be sent to. Flyte uses a token to serve as the placeholder to represent a -// unit of resource. Flyte resource manager manages resources by managing the tokens of the resources. For example, Flyte has a Qubole plugin that allows Flyte -// tasks to send out Hive commands to Qubole. Here, a single Qubole cluster is a resource, and sending out a single Hive command to a Qubole cluster consumes a -// token of the corresponding resource. +// ResourceManager Interface +// 1. Terms and definitions +// - Resource: resource is an abstraction of anything that has a limited quota of units and can be claimed in a +// single unit or multiple units at once. At Flyte's current state, a resource means a logical +// separation (e.g., a cluster) of an external service that allows a limited number of outstanding +// requests to be sent to. +// - Token: Flyte uses a token to serve as the placeholder to represent a unit of resource. Flyte resource manager +// manages resources by managing the tokens of the resources. +// 2. Description +// ResourceManager provides a task-type-specific pooling system for Flyte Tasks. Plugin writers can optionally +// request for resources in their tasks, in single quantity. +// 3. Usage +// A Flyte plugin registers the resources and the desired quota of each resource with ResourceRegistrar at the +// setup time of Flyte Propeller. At the end of the setup time, Flyte Propeller builds a ResourceManager based on +// these registration requests. // -// At the setup time of Flyte Propeller, a Flyte plugin needs to register the resources and the desired quota of each resource with Flyte resource manager. During -// runtime, Flyte resource manager does two simple things: allocating tokens and releasing tokens. When a Flyte task execution wants to send a request to an external -// service, first it needs to claim a unit of the corresponding resource. Specifically, an execution needs to generate a unique token, and use the token to call the -// resource manager's AllocateResource() function. Flyte resource manager will check its current utilization and the allocation policy to decide whether or not to grant -// the request. Only when receiving the "Granted" return value shall this execution move forward and send out the request. The granted token will be recorded in a token pool -// corresponding to the resource and managed by the resource manager. When the request is done, the plugin will ask the resource manager to release the token, and the token -// will be erased from the corresponding pool. +// During runtime, the ResourceManager does two simple things: allocating tokens and releasing tokens. When a Flyte +// task execution wants to send a request to an external service, the plugin should claim a unit of the corresponding +// resource. Specifically, an execution needs to generate a unique token, and register the token with ResourceManager +// by calling ResourceManager's AllocateResource() function. ResourceManager will check its current utilization and +// the allocation policy to decide whether or not to grant the request. Only when receiving the "AllocationGranted" +// status shall this execution move forward and send out the request. The granted token will be recorded in a token +// pool corresponding to the resource and managed by ResourceManager. When the request is done, the plugin will ask +// the resource manager to release the token by calling ResourceManager's ReleaseResource(), and the token will be +// erased from the corresponding pool. +// 4. Example +// Flyte has a built-on Qubole plugin that allows Flyte tasks to send out Hive commands to Qubole. +// In the plugin, a single Qubole cluster is a resource, and sending out a single Hive command to a Qubole cluster consumes +// a token of the corresponding resource. The resource allocation is achieved by the Qubole plugin calling +// status, err := AllocateResource(ctx, , , ) +// and the de-allocation is achieved by the plugin calling +// status, err := AllocateResource(ctx, , , ) +// +// For example, +// status, err := AllocateResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0", ResourceConstraintsSpec{}) +// When the corresponding Hive command finishes, the plugin needs to make the following function call to release +// the corresponding token +// err := ReleaseResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0") type ResourceManager interface { GetID() string // During execution time, plugins can call AllocateResource() to register a token to the token pool associated with a resource with the resource manager. @@ -65,7 +88,9 @@ type ResourceConstraint struct { Value int64 } -// ResourceConstraintsSpec is a contract that a plugin can specify, to force quota-acquisition constraints at different levels. +// ResourceConstraintsSpec is a contract that a plugin can specify with ResourceManager to force runtime quota-allocation constraints +// at different levels. +// // Setting constraints in a ResourceConstraintsSpec to nil objects is valid, meaning there's no constraint at the corresponding level. // For example, a ResourceConstraintsSpec with nil ProjectScopeResourceConstraint and a non-nil NamespaceScopeResourceConstraint means // that it only poses a cap at the namespace level. A zero-value ResourceConstraintsSpec means there's no constraints posed at any level.