Skip to content

Commit

Permalink
Merge pull request flyteorg#50 from lyft/namespaced-resource-mgr
Browse files Browse the repository at this point in the history
Cluster-namespaced redis set and project-namespaced allocation token [resource mgr]
  • Loading branch information
bnsblue authored Jan 2, 2020
2 parents 66b40c7 + feb9edb commit 0ebaa33
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ type NoopResourceManagerBuilder struct {

func (r *NoopResourceManagerBuilder) ResourceRegistrar(namespacePrefix pluginCore.ResourceNamespace) pluginCore.ResourceRegistrar {
return ResourceRegistrarProxy{
ResourceRegistrar: r,
NamespacePrefix: namespacePrefix,
ResourceRegistrar: r,
ResourceNamespacePrefix: namespacePrefix,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type RedisResourceManagerBuilder struct {

func (r *RedisResourceManagerBuilder) ResourceRegistrar(namespacePrefix pluginCore.ResourceNamespace) pluginCore.ResourceRegistrar {
return ResourceRegistrarProxy{
ResourceRegistrar: r,
NamespacePrefix: namespacePrefix,
ResourceRegistrar: r,
ResourceNamespacePrefix: namespacePrefix,
}
}

Expand Down Expand Up @@ -73,8 +73,11 @@ func (r *RedisResourceManagerBuilder) BuildResourceManager(ctx context.Context)

// building the resources and insert them into the resource manager
for namespace, quota := range r.namespacedResourcesQuotaMap {
// `namespace` is always prefixed with the plugin ID. Each plugin can then affix additional sub-namespaces to it to create different resource pools.
// For example, hive qubole plugin's namespaces contain plugin ID and qubole cluster (e.g., "qubole:default-cluster").
prefixedNamespace := r.getNamespacedRedisSetKey(namespace)
metrics := NewRedisResourceManagerMetrics(r.MetricsScope.NewSubScope(prefixedNamespace))

rm.namespacedResourcesMap[namespace] = &Resource{
quota: quota,
metrics: metrics,
Expand Down Expand Up @@ -108,10 +111,11 @@ type RedisResourceManager struct {
namespacedResourcesMap map[pluginCore.ResourceNamespace]*Resource
}

func GetTaskResourceManager(r pluginCore.ResourceManager, namespacePrefix pluginCore.ResourceNamespace) pluginCore.ResourceManager {
func GetTaskResourceManager(r pluginCore.ResourceManager, resourceNamespacePrefix pluginCore.ResourceNamespace, allocationTokenPrefix TokenPrefix) pluginCore.ResourceManager {
return Proxy{
ResourceManager: r,
NamespacePrefix: namespacePrefix,
ResourceManager: r,
ResourceNamespacePrefix: resourceNamespacePrefix,
TokenPrefix: allocationTokenPrefix,
}
}

Expand Down
38 changes: 33 additions & 5 deletions pkg/controller/nodes/task/resourcemanager/resourcemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,37 @@ package resourcemanager

import (
"context"
"fmt"
"sync"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"

pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flytestdlib/promutils"
)

//go:generate mockery -name ResourceManager -case=underscore

type TokenPrefix string

const execUrnPrefix = "ex"
const execUrnSeparator = ":"
const tokenNamespaceSeparator = "-"

func (t TokenPrefix) append(s string) string {
return fmt.Sprintf("%s%s%s", t, tokenNamespaceSeparator, s)
}

func composeExecutionUrn(id *core.TaskExecutionIdentifier) string {
return execUrnPrefix + execUrnSeparator + id.GetNodeExecutionId().GetExecutionId().GetProject() +
execUrnSeparator + id.GetNodeExecutionId().GetExecutionId().GetDomain() + execUrnSeparator + id.GetNodeExecutionId().GetExecutionId().GetName()
}

func ComposeTokenPrefix(id *core.TaskExecutionIdentifier) TokenPrefix {
execUrn := composeExecutionUrn(id) // This is for the ease of debugging. Doesn't necessarily need to have this
return TokenPrefix(execUrn)
}

// This struct is designed to serve as the identifier of an user of resource manager
type Resource struct {
quota int
Expand All @@ -26,18 +49,22 @@ type Builder interface {
BuildResourceManager(ctx context.Context) (pluginCore.ResourceManager, error)
}

// A proxy will be created for each TaskExecutionContext
type Proxy struct {
pluginCore.ResourceManager
NamespacePrefix pluginCore.ResourceNamespace
ResourceNamespacePrefix pluginCore.ResourceNamespace
TokenPrefix TokenPrefix
}

func (p Proxy) getPrefixedNamespace(namespace pluginCore.ResourceNamespace) pluginCore.ResourceNamespace {
return p.NamespacePrefix.CreateSubNamespace(namespace)
return p.ResourceNamespacePrefix.CreateSubNamespace(namespace)
}

func (p Proxy) AllocateResource(ctx context.Context, namespace pluginCore.ResourceNamespace,
allocationToken string) (pluginCore.AllocationStatus, error) {
status, err := p.ResourceManager.AllocateResource(ctx, p.getPrefixedNamespace(namespace), allocationToken)

namespacedAllocationToken := p.TokenPrefix.append(allocationToken)
status, err := p.ResourceManager.AllocateResource(ctx, p.getPrefixedNamespace(namespace), namespacedAllocationToken)
return status, err
}

Expand All @@ -49,11 +76,12 @@ func (p Proxy) ReleaseResource(ctx context.Context, namespace pluginCore.Resourc

type ResourceRegistrarProxy struct {
pluginCore.ResourceRegistrar
NamespacePrefix pluginCore.ResourceNamespace
ResourceNamespacePrefix pluginCore.ResourceNamespace
TokenPrefix TokenPrefix
}

func (p ResourceRegistrarProxy) getPrefixedNamespace(namespace pluginCore.ResourceNamespace) pluginCore.ResourceNamespace {
return p.NamespacePrefix.CreateSubNamespace(namespace)
return p.ResourceNamespacePrefix.CreateSubNamespace(namespace)
}

func (p ResourceRegistrarProxy) RegisterResourceQuota(ctx context.Context, namespace pluginCore.ResourceNamespace, quota int) error {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/nodes/task/taskexec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node
return nil, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "unable to initialize plugin state manager")
}

namespacePrefix := pluginCore.ResourceNamespace(pluginID)
resourceNamespacePrefix := pluginCore.ResourceNamespace(pluginID)

return &taskExecutionContext{
NodeExecutionContext: nCtx,
Expand All @@ -144,7 +144,8 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node
taskExecID: taskExecutionID{execName: uniqueID, id: id},
o: nCtx.Node(),
},
rm: resourcemanager.GetTaskResourceManager(t.resourceManager, namespacePrefix),
rm: resourcemanager.GetTaskResourceManager(
t.resourceManager, resourceNamespacePrefix, resourcemanager.ComposeTokenPrefix(id)),
psm: psm,
tr: nCtx.TaskReader(),
ow: ow,
Expand Down

0 comments on commit 0ebaa33

Please sign in to comment.