From 8ff6f1d126d158c227ca3da130e6404b37b84a44 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Sun, 29 Dec 2019 19:59:05 -0800 Subject: [PATCH 1/5] cluster-namespaced redis set is wip; project-namespaced token is almost done --- .../resourcemanager/noop_resourcemanager.go | 4 +-- .../resourcemanager/redis_resourcemanager.go | 13 +++++---- .../task/resourcemanager/resourcemanager.go | 29 +++++++++++++++---- pkg/controller/nodes/task/taskexec_context.go | 5 ++-- 4 files changed, 37 insertions(+), 14 deletions(-) diff --git a/pkg/controller/nodes/task/resourcemanager/noop_resourcemanager.go b/pkg/controller/nodes/task/resourcemanager/noop_resourcemanager.go index 96975836f0..e415286225 100644 --- a/pkg/controller/nodes/task/resourcemanager/noop_resourcemanager.go +++ b/pkg/controller/nodes/task/resourcemanager/noop_resourcemanager.go @@ -11,8 +11,8 @@ type NoopResourceManagerBuilder struct { func (r *NoopResourceManagerBuilder) ResourceRegistrar(namespacePrefix pluginCore.ResourceNamespace) pluginCore.ResourceRegistrar { return ResourceRegistrarProxy{ - ResourceRegistrar: r, - NamespacePrefix: namespacePrefix, + ResourceRegistrar: r, + ResourceNamespacePrefix: namespacePrefix, } } diff --git a/pkg/controller/nodes/task/resourcemanager/redis_resourcemanager.go b/pkg/controller/nodes/task/resourcemanager/redis_resourcemanager.go index 79e5fffe41..f5a68db38f 100644 --- a/pkg/controller/nodes/task/resourcemanager/redis_resourcemanager.go +++ b/pkg/controller/nodes/task/resourcemanager/redis_resourcemanager.go @@ -29,8 +29,8 @@ type RedisResourceManagerBuilder struct { func (r *RedisResourceManagerBuilder) ResourceRegistrar(namespacePrefix pluginCore.ResourceNamespace) pluginCore.ResourceRegistrar { return ResourceRegistrarProxy{ - ResourceRegistrar: r, - NamespacePrefix: namespacePrefix, + ResourceRegistrar: r, + ResourceNamespacePrefix: namespacePrefix, } } @@ -73,8 +73,10 @@ func (r *RedisResourceManagerBuilder) BuildResourceManager(ctx context.Context) // building the resources and insert them into the resource manager for namespace, quota := range r.namespacedResourcesQuotaMap { + // `namespace` contains plugin name and cluster (e.g., "qubole:default-cluster") prefixedNamespace := r.getNamespacedRedisSetKey(namespace) metrics := NewRedisResourceManagerMetrics(r.MetricsScope.NewSubScope(prefixedNamespace)) + rm.namespacedResourcesMap[namespace] = &Resource{ quota: quota, metrics: metrics, @@ -108,10 +110,11 @@ type RedisResourceManager struct { namespacedResourcesMap map[pluginCore.ResourceNamespace]*Resource } -func GetTaskResourceManager(r pluginCore.ResourceManager, namespacePrefix pluginCore.ResourceNamespace) pluginCore.ResourceManager { +func GetTaskResourceManager(r pluginCore.ResourceManager, namespacePrefix pluginCore.ResourceNamespace, allocationTokenNamespace TokenNamespace) pluginCore.ResourceManager { return Proxy{ - ResourceManager: r, - NamespacePrefix: namespacePrefix, + ResourceManager: r, + ResourceNamespacePrefix: namespacePrefix, + TokenNamespacePrefix: allocationTokenNamespace, } } diff --git a/pkg/controller/nodes/task/resourcemanager/resourcemanager.go b/pkg/controller/nodes/task/resourcemanager/resourcemanager.go index 755c4bad67..7b34a20083 100644 --- a/pkg/controller/nodes/task/resourcemanager/resourcemanager.go +++ b/pkg/controller/nodes/task/resourcemanager/resourcemanager.go @@ -2,6 +2,8 @@ package resourcemanager import ( "context" + "fmt" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "sync" pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" @@ -10,6 +12,18 @@ import ( //go:generate mockery -name ResourceManager -case=underscore +type TokenNamespace string + +const tokenNamespaceSeparator = "-" + +func (t TokenNamespace) append(s string) string { + return fmt.Sprintf("%s%s%s", t, tokenNamespaceSeparator, s) +} + +func ComposeTokenNamespace(id *core.TaskExecutionIdentifier) TokenNamespace { + return TokenNamespace(id.GetTaskId().GetProject() + tokenNamespaceSeparator + id.GetTaskId().GetDomain()) +} + // This struct is designed to serve as the identifier of an user of resource manager type Resource struct { quota int @@ -26,18 +40,22 @@ type Builder interface { BuildResourceManager(ctx context.Context) (pluginCore.ResourceManager, error) } +// A proxy will be created for each TaskExecutionContext type Proxy struct { pluginCore.ResourceManager - NamespacePrefix pluginCore.ResourceNamespace + ResourceNamespacePrefix pluginCore.ResourceNamespace + TokenNamespacePrefix TokenNamespace } func (p Proxy) getPrefixedNamespace(namespace pluginCore.ResourceNamespace) pluginCore.ResourceNamespace { - return p.NamespacePrefix.CreateSubNamespace(namespace) + return p.ResourceNamespacePrefix.CreateSubNamespace(namespace) } func (p Proxy) AllocateResource(ctx context.Context, namespace pluginCore.ResourceNamespace, allocationToken string) (pluginCore.AllocationStatus, error) { - status, err := p.ResourceManager.AllocateResource(ctx, p.getPrefixedNamespace(namespace), allocationToken) + + namespacedAllocationToken := p.TokenNamespacePrefix.append(allocationToken) + status, err := p.ResourceManager.AllocateResource(ctx, p.getPrefixedNamespace(namespace), namespacedAllocationToken) return status, err } @@ -49,11 +67,12 @@ func (p Proxy) ReleaseResource(ctx context.Context, namespace pluginCore.Resourc type ResourceRegistrarProxy struct { pluginCore.ResourceRegistrar - NamespacePrefix pluginCore.ResourceNamespace + ResourceNamespacePrefix pluginCore.ResourceNamespace + TokenNamespacePrefix TokenNamespace } func (p ResourceRegistrarProxy) getPrefixedNamespace(namespace pluginCore.ResourceNamespace) pluginCore.ResourceNamespace { - return p.NamespacePrefix.CreateSubNamespace(namespace) + return p.ResourceNamespacePrefix.CreateSubNamespace(namespace) } func (p ResourceRegistrarProxy) RegisterResourceQuota(ctx context.Context, namespace pluginCore.ResourceNamespace, quota int) error { diff --git a/pkg/controller/nodes/task/taskexec_context.go b/pkg/controller/nodes/task/taskexec_context.go index 783e987dbb..79a240fa0d 100644 --- a/pkg/controller/nodes/task/taskexec_context.go +++ b/pkg/controller/nodes/task/taskexec_context.go @@ -135,7 +135,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node return nil, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "unable to initialize plugin state manager") } - namespacePrefix := pluginCore.ResourceNamespace(pluginID) + resourceNamespacePrefix := pluginCore.ResourceNamespace(pluginID) return &taskExecutionContext{ NodeExecutionContext: nCtx, @@ -144,7 +144,8 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node taskExecID: taskExecutionID{execName: uniqueID, id: id}, o: nCtx.Node(), }, - rm: resourcemanager.GetTaskResourceManager(t.resourceManager, namespacePrefix), + rm: resourcemanager.GetTaskResourceManager( + t.resourceManager, resourceNamespacePrefix, resourcemanager.ComposeTokenNamespace(id)), psm: psm, tr: nCtx.TaskReader(), ow: ow, From 38341ee5a7e468f3ef2355848ead253251b8eaef Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Mon, 30 Dec 2019 11:28:08 -0800 Subject: [PATCH 2/5] Use the correct Id for composing the token namespace --- .../nodes/task/resourcemanager/resourcemanager.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/controller/nodes/task/resourcemanager/resourcemanager.go b/pkg/controller/nodes/task/resourcemanager/resourcemanager.go index 7b34a20083..19cf955377 100644 --- a/pkg/controller/nodes/task/resourcemanager/resourcemanager.go +++ b/pkg/controller/nodes/task/resourcemanager/resourcemanager.go @@ -14,14 +14,22 @@ import ( type TokenNamespace string +const execUrnPrefix = "ex" +const execUrnSeparator = ":" const tokenNamespaceSeparator = "-" func (t TokenNamespace) append(s string) string { return fmt.Sprintf("%s%s%s", t, tokenNamespaceSeparator, s) } +func composeExecutionUrn(id *core.TaskExecutionIdentifier) string { + return execUrnPrefix + execUrnSeparator + id.GetNodeExecutionId().GetExecutionId().GetProject() + + execUrnSeparator + id.GetNodeExecutionId().GetExecutionId().GetDomain() + execUrnSeparator + id.GetNodeExecutionId().GetExecutionId().GetName() +} + func ComposeTokenNamespace(id *core.TaskExecutionIdentifier) TokenNamespace { - return TokenNamespace(id.GetTaskId().GetProject() + tokenNamespaceSeparator + id.GetTaskId().GetDomain()) + execUrn := composeExecutionUrn(id) // This is for the ease of debugging. Doesn't necessarily need to have this + return TokenNamespace(execUrn) } // This struct is designed to serve as the identifier of an user of resource manager From 2e0789bbe771175b0bbfae0bc0e674d1e4e86dc5 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Mon, 30 Dec 2019 12:59:14 -0800 Subject: [PATCH 3/5] go imports --- .../nodes/task/resourcemanager/redis_resourcemanager.go | 2 +- pkg/controller/nodes/task/resourcemanager/resourcemanager.go | 3 ++- pkg/controller/nodes/task/taskexec_context.go | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/controller/nodes/task/resourcemanager/redis_resourcemanager.go b/pkg/controller/nodes/task/resourcemanager/redis_resourcemanager.go index f5a68db38f..ffff4254e9 100644 --- a/pkg/controller/nodes/task/resourcemanager/redis_resourcemanager.go +++ b/pkg/controller/nodes/task/resourcemanager/redis_resourcemanager.go @@ -114,7 +114,7 @@ func GetTaskResourceManager(r pluginCore.ResourceManager, namespacePrefix plugin return Proxy{ ResourceManager: r, ResourceNamespacePrefix: namespacePrefix, - TokenNamespacePrefix: allocationTokenNamespace, + TokenNamespacePrefix: allocationTokenNamespace, } } diff --git a/pkg/controller/nodes/task/resourcemanager/resourcemanager.go b/pkg/controller/nodes/task/resourcemanager/resourcemanager.go index 19cf955377..17ecd60b00 100644 --- a/pkg/controller/nodes/task/resourcemanager/resourcemanager.go +++ b/pkg/controller/nodes/task/resourcemanager/resourcemanager.go @@ -3,9 +3,10 @@ package resourcemanager import ( "context" "fmt" - "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "sync" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" "github.com/lyft/flytestdlib/promutils" ) diff --git a/pkg/controller/nodes/task/taskexec_context.go b/pkg/controller/nodes/task/taskexec_context.go index 79a240fa0d..b6650893b5 100644 --- a/pkg/controller/nodes/task/taskexec_context.go +++ b/pkg/controller/nodes/task/taskexec_context.go @@ -144,7 +144,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node taskExecID: taskExecutionID{execName: uniqueID, id: id}, o: nCtx.Node(), }, - rm: resourcemanager.GetTaskResourceManager( + rm: resourcemanager.GetTaskResourceManager( t.resourceManager, resourceNamespacePrefix, resourcemanager.ComposeTokenNamespace(id)), psm: psm, tr: nCtx.TaskReader(), From c0df9d05640df30cc6d5c8a940ddf3169a5dc171 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Mon, 30 Dec 2019 13:29:45 -0800 Subject: [PATCH 4/5] remove the hack that calls golangci-lint twice, as the bug seemed to have been resolved --- boilerplate/lyft/golang_test_targets/Makefile | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/boilerplate/lyft/golang_test_targets/Makefile b/boilerplate/lyft/golang_test_targets/Makefile index bfb6d7b678..4c76cdae52 100644 --- a/boilerplate/lyft/golang_test_targets/Makefile +++ b/boilerplate/lyft/golang_test_targets/Makefile @@ -8,12 +8,7 @@ DEP_SHA=1f7c19e5f52f49ffb9f956f64c010be14683468b .PHONY: lint lint: #lints the package for common code smells which golangci-lint || GO111MODULE=on go install github.com/golangci/golangci-lint/cmd/golangci-lint - # Calling lint twice here is a hack. The first call seem to fail when internally calling `go list...` - # However, that call seem to have some effects (e.g. https://github.com/golang/go/issues/29452) which, for some - # reason, allows the subsequent calls to succeed. - # TODO: Evaluate whether this is still a problem after moving admin dependency system to go modules.. -# GO111MODULE=off GL_DEBUG=linters_output,loader,env golangci-lint run --exclude deprecated -v || true - GO111MODULE=off GL_DEBUG=linters_output,loader,env golangci-lint run --deadline=5m --exclude deprecated -v + GO111MODULE=off golangci-lint run --deadline=5m --exclude deprecated -v # If code is failing goimports linter, this will fix. # skips 'vendor' From feb9edb6285d56bf1208d43c3b0f77b9dcfe478d Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Mon, 30 Dec 2019 13:50:55 -0800 Subject: [PATCH 5/5] pr comments --- .../task/resourcemanager/redis_resourcemanager.go | 9 +++++---- .../nodes/task/resourcemanager/resourcemanager.go | 14 +++++++------- pkg/controller/nodes/task/taskexec_context.go | 2 +- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/pkg/controller/nodes/task/resourcemanager/redis_resourcemanager.go b/pkg/controller/nodes/task/resourcemanager/redis_resourcemanager.go index ffff4254e9..c6b1977f30 100644 --- a/pkg/controller/nodes/task/resourcemanager/redis_resourcemanager.go +++ b/pkg/controller/nodes/task/resourcemanager/redis_resourcemanager.go @@ -73,7 +73,8 @@ func (r *RedisResourceManagerBuilder) BuildResourceManager(ctx context.Context) // building the resources and insert them into the resource manager for namespace, quota := range r.namespacedResourcesQuotaMap { - // `namespace` contains plugin name and cluster (e.g., "qubole:default-cluster") + // `namespace` is always prefixed with the plugin ID. Each plugin can then affix additional sub-namespaces to it to create different resource pools. + // For example, hive qubole plugin's namespaces contain plugin ID and qubole cluster (e.g., "qubole:default-cluster"). prefixedNamespace := r.getNamespacedRedisSetKey(namespace) metrics := NewRedisResourceManagerMetrics(r.MetricsScope.NewSubScope(prefixedNamespace)) @@ -110,11 +111,11 @@ type RedisResourceManager struct { namespacedResourcesMap map[pluginCore.ResourceNamespace]*Resource } -func GetTaskResourceManager(r pluginCore.ResourceManager, namespacePrefix pluginCore.ResourceNamespace, allocationTokenNamespace TokenNamespace) pluginCore.ResourceManager { +func GetTaskResourceManager(r pluginCore.ResourceManager, resourceNamespacePrefix pluginCore.ResourceNamespace, allocationTokenPrefix TokenPrefix) pluginCore.ResourceManager { return Proxy{ ResourceManager: r, - ResourceNamespacePrefix: namespacePrefix, - TokenNamespacePrefix: allocationTokenNamespace, + ResourceNamespacePrefix: resourceNamespacePrefix, + TokenPrefix: allocationTokenPrefix, } } diff --git a/pkg/controller/nodes/task/resourcemanager/resourcemanager.go b/pkg/controller/nodes/task/resourcemanager/resourcemanager.go index 17ecd60b00..21f5e1deea 100644 --- a/pkg/controller/nodes/task/resourcemanager/resourcemanager.go +++ b/pkg/controller/nodes/task/resourcemanager/resourcemanager.go @@ -13,13 +13,13 @@ import ( //go:generate mockery -name ResourceManager -case=underscore -type TokenNamespace string +type TokenPrefix string const execUrnPrefix = "ex" const execUrnSeparator = ":" const tokenNamespaceSeparator = "-" -func (t TokenNamespace) append(s string) string { +func (t TokenPrefix) append(s string) string { return fmt.Sprintf("%s%s%s", t, tokenNamespaceSeparator, s) } @@ -28,9 +28,9 @@ func composeExecutionUrn(id *core.TaskExecutionIdentifier) string { execUrnSeparator + id.GetNodeExecutionId().GetExecutionId().GetDomain() + execUrnSeparator + id.GetNodeExecutionId().GetExecutionId().GetName() } -func ComposeTokenNamespace(id *core.TaskExecutionIdentifier) TokenNamespace { +func ComposeTokenPrefix(id *core.TaskExecutionIdentifier) TokenPrefix { execUrn := composeExecutionUrn(id) // This is for the ease of debugging. Doesn't necessarily need to have this - return TokenNamespace(execUrn) + return TokenPrefix(execUrn) } // This struct is designed to serve as the identifier of an user of resource manager @@ -53,7 +53,7 @@ type Builder interface { type Proxy struct { pluginCore.ResourceManager ResourceNamespacePrefix pluginCore.ResourceNamespace - TokenNamespacePrefix TokenNamespace + TokenPrefix TokenPrefix } func (p Proxy) getPrefixedNamespace(namespace pluginCore.ResourceNamespace) pluginCore.ResourceNamespace { @@ -63,7 +63,7 @@ func (p Proxy) getPrefixedNamespace(namespace pluginCore.ResourceNamespace) plug func (p Proxy) AllocateResource(ctx context.Context, namespace pluginCore.ResourceNamespace, allocationToken string) (pluginCore.AllocationStatus, error) { - namespacedAllocationToken := p.TokenNamespacePrefix.append(allocationToken) + namespacedAllocationToken := p.TokenPrefix.append(allocationToken) status, err := p.ResourceManager.AllocateResource(ctx, p.getPrefixedNamespace(namespace), namespacedAllocationToken) return status, err } @@ -77,7 +77,7 @@ func (p Proxy) ReleaseResource(ctx context.Context, namespace pluginCore.Resourc type ResourceRegistrarProxy struct { pluginCore.ResourceRegistrar ResourceNamespacePrefix pluginCore.ResourceNamespace - TokenNamespacePrefix TokenNamespace + TokenPrefix TokenPrefix } func (p ResourceRegistrarProxy) getPrefixedNamespace(namespace pluginCore.ResourceNamespace) pluginCore.ResourceNamespace { diff --git a/pkg/controller/nodes/task/taskexec_context.go b/pkg/controller/nodes/task/taskexec_context.go index b6650893b5..ab79848658 100644 --- a/pkg/controller/nodes/task/taskexec_context.go +++ b/pkg/controller/nodes/task/taskexec_context.go @@ -145,7 +145,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node o: nCtx.Node(), }, rm: resourcemanager.GetTaskResourceManager( - t.resourceManager, resourceNamespacePrefix, resourcemanager.ComposeTokenNamespace(id)), + t.resourceManager, resourceNamespacePrefix, resourcemanager.ComposeTokenPrefix(id)), psm: psm, tr: nCtx.TaskReader(), ow: ow,