From cdd6fa250981b5ae1481f54794e028dc7b1cff23 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 23 Jun 2020 17:53:52 -0700 Subject: [PATCH] Add a generic labeled gauge for K8s plugin resources (#137) # TL;DR Adds a generic labeled gauge for K8s plugin resources ## Type - [ ] Bug Fix - [x] Feature - [ ] Plugin ## Are all requirements met? - [x] Code completed - [x] Smoke tested - [x] Unit tests added - [x] Code documentation added - [x] Any pending items have an associated Issue ## Complete description * We are adding a generic utility to emit gauge metrics in the `plugin_manager` using the labeled gauges now found in flytestdlib. * Henceforth, a goroutine will be spun up that will create a `ResourceLevelMonitor` for each type that a plugin registers. * Changing the existing gauge collecting utilty for FlyteWorkflow CRD objects to use the same labeled gauge. * We may wish to combine these in the future but as their aggregations are different, we are keeping them separate for now. ## Tracking Issue https://github.com/lyft/flyte/issues/311 ## Follow-up issue NA --- .../lyft/golang_support_tools/tools.go | 2 +- go.sum | 18 +- pkg/controller/controller.go | 16 +- pkg/controller/controller_test.go | 10 +- .../nodes/task/k8s/plugin_collector.go | 182 ++++++++++++++++++ .../nodes/task/k8s/plugin_collector_test.go | 110 +++++++++++ .../nodes/task/k8s/plugin_manager.go | 63 +++++- .../nodes/task/k8s/plugin_manager_test.go | 38 +++- pkg/controller/nodes/task/plugin_config.go | 5 +- 9 files changed, 395 insertions(+), 49 deletions(-) create mode 100644 pkg/controller/nodes/task/k8s/plugin_collector.go create mode 100644 pkg/controller/nodes/task/k8s/plugin_collector_test.go diff --git a/boilerplate/lyft/golang_support_tools/tools.go b/boilerplate/lyft/golang_support_tools/tools.go index 4310b39d7..88ff64523 100644 --- a/boilerplate/lyft/golang_support_tools/tools.go +++ b/boilerplate/lyft/golang_support_tools/tools.go @@ -3,8 +3,8 @@ package tools import ( + _ "github.com/alvaroloes/enumer" _ "github.com/golangci/golangci-lint/cmd/golangci-lint" _ "github.com/lyft/flytestdlib/cli/pflags" _ "github.com/vektra/mockery/cmd/mockery" - _ "github.com/alvaroloes/enumer" ) diff --git a/go.sum b/go.sum index 0c0b1f98c..38923eb31 100644 --- a/go.sum +++ b/go.sum @@ -391,25 +391,12 @@ github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnz github.com/lyft/datacatalog v0.2.1 h1:W7LsAjaS297iLCtSH9ZaAAG3YPofwkbbgIaqkfdeM0o= github.com/lyft/datacatalog v0.2.1/go.mod h1:ktrPvzTDUwHO5Lv0hLH38zLHnOJ++rGoAO0iQ/sIPJ4= github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes= -github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteidl v0.17.24 h1:N5mmk2/0062VjbIeUXLHWVZwkxGW20RdZtshaea2nL0= -github.com/lyft/flyteidl v0.17.24/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.34 h1:8ERT/8vY40dOPPJrdD8ossBb30WkvzUx/IAFMR/7+9U= github.com/lyft/flyteidl v0.17.34/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteplugins v0.3.23 h1:cN6d6f1ZkoHw+HD4wFCSVFVv+sCSeyx13E+hXIYEDzo= -github.com/lyft/flyteplugins v0.3.23/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= -github.com/lyft/flyteplugins v0.3.28 h1:4YSjJyQUHFtVoQio4X3wYtS7WRIGdJQf9Wtcl75e+1w= -github.com/lyft/flyteplugins v0.3.28/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= -github.com/lyft/flyteplugins v0.3.29 h1:NN88yXv6sTouMVwQEgbP0A6k+uznGr00ZcKP6ZFUPrU= -github.com/lyft/flyteplugins v0.3.29/go.mod h1:HHO6KC/2z77n9o9KM697YvSP85IWDe6jl6tAIrMLqWU= -github.com/lyft/flyteplugins v0.3.34/go.mod h1:HHO6KC/2z77n9o9KM697YvSP85IWDe6jl6tAIrMLqWU= github.com/lyft/flyteplugins v0.3.35 h1:9s2BrJ82RoTJa1Cy02vqQy+ajxS+d4MQAkuUFoaiCuQ= github.com/lyft/flyteplugins v0.3.35/go.mod h1:Dk9rnPCbgR7rC9dNM49260TQ51TvRsljDBJ6uBjZ9ys= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= -github.com/lyft/flytestdlib v0.3.3 h1:MkWXPkwQinh6MR3Yf5siZhmRSt9r4YmsF+5kvVVVedE= -github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= -github.com/lyft/flytestdlib v0.3.7/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.9 h1:NaKp9xkeWWwhVvqTOcR/FqlASy1N2gu/kN7PVe4S7YI= github.com/lyft/flytestdlib v0.3.9/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI= @@ -582,6 +569,7 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= @@ -915,7 +903,9 @@ gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c h1:grhR+C34yXImVGp7EzNk+DTIk+323eIUWOmEevy6bDo= gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 0c247f116..acd99c4e3 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -5,6 +5,8 @@ import ( "runtime/pprof" "time" + "github.com/lyft/flytestdlib/promutils/labeled" + "github.com/lyft/flytestdlib/contextutils" "k8s.io/apimachinery/pkg/labels" @@ -193,7 +195,7 @@ type ResourceLevelMonitor struct { // System Observability: This is a labeled gauge that emits the current number of FlyteWorkflow objects in the informer. It is used // to monitor current levels. It currently only splits by project/domain, not workflow status. - levels *prometheus.GaugeVec + levels labeled.Gauge // The thing that we want to measure the current levels of lister lister.FlyteWorkflowLister @@ -235,15 +237,10 @@ func (r *ResourceLevelMonitor) collect(ctx context.Context) { counts := r.countList(ctx, workflows) // Emit labeled metrics, for each project/domain combination. This can be aggregated later with Prometheus queries. - metricKeys := []contextutils.Key{contextutils.ProjectKey, contextutils.DomainKey} for project, val := range counts { for domain, num := range val { tempContext := contextutils.WithProjectDomain(ctx, project, domain) - gauge, err := r.levels.GetMetricWith(contextutils.Values(tempContext, metricKeys...)) - if err != nil { - panic(err) - } - gauge.Set(float64(num)) + r.levels.Set(tempContext, float64(num)) } } } @@ -271,9 +268,8 @@ func NewResourceLevelMonitor(scope promutils.Scope, lister lister.FlyteWorkflowL return &ResourceLevelMonitor{ Scope: scope, CollectorTimer: scope.MustNewStopWatch("collection_cycle", "Measures how long it takes to run a collection", time.Millisecond), - levels: scope.MustNewGaugeVec("flyteworkflow", "Current FlyteWorkflow levels", - contextutils.ProjectKey.String(), contextutils.DomainKey.String()), - lister: lister, + levels: labeled.NewGauge("flyteworkflow", "Current FlyteWorkflow levels per instance of propeller", scope), + lister: lister, } } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index b2e295935..4b5d1e771 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/lyft/flytestdlib/promutils/labeled" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" listers "github.com/lyft/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1" @@ -62,7 +64,7 @@ func (m mockWFLister) List(_ labels.Selector) (ret []*v1alpha1.FlyteWorkflow, er func TestResourceLevelMonitor_collect(t *testing.T) { scope := promutils.NewScope("testscope") - g := scope.MustNewGaugeVec("unittest", "testing", "project", "domain") + g := labeled.NewGauge("unittest", "testing", scope) lm := &ResourceLevelMonitor{ Scope: scope, CollectorTimer: scope.MustNewStopWatch("collection_cycle", "Measures how long it takes to run a collection", time.Millisecond), @@ -74,10 +76,10 @@ func TestResourceLevelMonitor_collect(t *testing.T) { var expected = ` # HELP testscope:unittest testing # TYPE testscope:unittest gauge - testscope:unittest{domain="dev",project="proj"} 2 - testscope:unittest{domain="dev",project="proj2"} 1 + testscope:unittest{domain="dev",project="proj", task="",wf=""} 2 + testscope:unittest{domain="dev",project="proj2", task="",wf=""} 1 ` - err := testutil.CollectAndCompare(g, strings.NewReader(expected)) + err := testutil.CollectAndCompare(g.GaugeVec, strings.NewReader(expected)) assert.NoError(t, err) } diff --git a/pkg/controller/nodes/task/k8s/plugin_collector.go b/pkg/controller/nodes/task/k8s/plugin_collector.go new file mode 100644 index 000000000..d2344c6e1 --- /dev/null +++ b/pkg/controller/nodes/task/k8s/plugin_collector.go @@ -0,0 +1,182 @@ +package k8s + +import ( + "context" + "runtime/pprof" + "strings" + "sync" + "time" + + "github.com/lyft/flytestdlib/logger" + "github.com/lyft/flytestdlib/promutils/labeled" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" + + "github.com/lyft/flytestdlib/contextutils" + "github.com/lyft/flytestdlib/promutils" +) + +const resourceLevelMonitorCycleDuration = 10 * time.Second +const KindKey contextutils.Key = "kind" + +// This object is responsible for emitting metrics that show the current number of a given K8s resource kind, cut by namespace. +// It needs to be kicked off. The periodicity is not currently configurable because it seems unnecessary. It will also +// a timer measuring how long it takes to run each measurement cycle. +type ResourceLevelMonitor struct { + Scope promutils.Scope + + // Meta timer - this times each collection cycle to measure how long it takes to collect the levels GaugeVec below + CollectorTimer *labeled.StopWatch + + // System Observability: This is a labeled gauge that emits the current number of objects in the informer. It is used + // to monitor current levels. + Levels *labeled.Gauge + + // This informer will be used to get a list of the underlying objects that we want a tally of + sharedInformer cache.SharedIndexInformer + + // The kind here will be used to differentiate all the metrics, we'll leave out group and version for now + gvk schema.GroupVersionKind + + // We only want to create one of these ResourceLevelMonitor objects per K8s resource type. If one of these objects + // already exists because another plugin created it, we pass along the pointer to that older object. However, we don't + // want the collection goroutine to be kicked off multiple times. + once sync.Once +} + +// The reason that we use namespace as the one and only thing to cut by is because it's the feature that we are sure that any +// K8s resource created by a plugin will have (as yet, Flyte doesn't have any plugins that create cluster level resources and +// it probably won't for a long time). We can't assume that all the operators and CRDs that Flyte will ever work with will have +// the exact same set of labels or annotations or owner references. The only thing we can really count on is namespace. +func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []interface{}) map[string]int { + // Map of namespace to counts + counts := map[string]int{} + + // Collect the object counts by namespace + for _, v := range objects { + metadata, err := meta.Accessor(v) + if err != nil { + logger.Errorf(ctx, "Error converting obj %v to an Accessor %s\n", v, err) + continue + } + counts[metadata.GetNamespace()]++ + } + + return counts +} + +// The context here is expected to already have a value for the KindKey +func (r *ResourceLevelMonitor) collect(ctx context.Context) { + // Emit gauges at the namespace layer - since these are just K8s resources, we cannot be guaranteed to have the necessary + // information to derive project/domain + objects := r.sharedInformer.GetStore().List() + counts := r.countList(ctx, objects) + + for ns, count := range counts { + withNamespaceCtx := contextutils.WithNamespace(ctx, ns) + r.Levels.Set(withNamespaceCtx, float64(count)) + } +} + +func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) { + ticker := time.NewTicker(resourceLevelMonitorCycleDuration) + collectorCtx := contextutils.WithGoroutineLabel(ctx, "k8s-resource-level-monitor") + // Since the idea is that one of these objects is always only responsible for exactly one type of K8s resource, we + // can safely set the context here for that kind for all downstream usage + collectorCtx = context.WithValue(ctx, KindKey, strings.ToLower(r.gvk.Kind)) + + go func() { + defer ticker.Stop() + pprof.SetGoroutineLabels(collectorCtx) + r.sharedInformer.HasSynced() + logger.Infof(ctx, "K8s resource collector %s has synced", r.gvk.Kind) + for { + select { + case <-collectorCtx.Done(): + return + case <-ticker.C: + t := r.CollectorTimer.Start(collectorCtx) + r.collect(collectorCtx) + t.Stop() + } + } + }() +} + +func (r *ResourceLevelMonitor) RunCollectorOnce(ctx context.Context) { + r.once.Do(func() { + r.RunCollector(ctx) + }) +} + +// This struct is here to ensure that we do not create more than one of these monitors for a given GVK. It wouldn't necessarily break +// anything, but it's a waste of compute cycles to compute counts multiple times. This can happen if multiple plugins create the same +// underlying K8s resource type. If two plugins both created Pods (ie sidecar and container), without this we would launch two +// ResourceLevelMonitor's, have two goroutines spinning, etc. +type ResourceMonitorIndex struct { + lock *sync.Mutex + monitors map[schema.GroupVersionKind]*ResourceLevelMonitor + + // These are declared here because this constructor will be called more than once, by different K8s resource types (Pods, SparkApps, OtherCrd, etc.) + // and metric names have to be unique. It felt more reasonable at time of writing to have one metric and have each resource type just be a label + // rather than one metric per type, but can revisit this down the road. + gauges map[promutils.Scope]*labeled.Gauge + stopwatches map[promutils.Scope]*labeled.StopWatch +} + +func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, si cache.SharedIndexInformer, + gvk schema.GroupVersionKind) *ResourceLevelMonitor { + + logger.Infof(ctx, "Attempting to create K8s gauge emitter for kind %s/%s", gvk.Version, gvk.Kind) + + r.lock.Lock() + defer r.lock.Unlock() + + if r.monitors[gvk] != nil { + logger.Infof(ctx, "Monitor for resource type %s already created, skipping...", gvk.Kind) + return r.monitors[gvk] + } + logger.Infof(ctx, "Creating monitor for resource type %s...", gvk.Kind) + + // Refer to the existing labels in main.go of this repo. For these guys, we need to add namespace and kind (the K8s resource name, pod, sparkapp, etc.) + additionalLabels := labeled.AdditionalLabelsOption{ + Labels: []string{contextutils.NamespaceKey.String(), KindKey.String()}, + } + if r.gauges[scope] == nil { + x := labeled.NewGauge("k8s_resources", "Current levels of K8s objects as seen from their informer caches", scope, additionalLabels) + r.gauges[scope] = &x + } + if r.stopwatches[scope] == nil { + x := labeled.NewStopWatch("k8s_collection_cycle", "Measures how long it takes to run a collection", + time.Millisecond, scope, additionalLabels) + r.stopwatches[scope] = &x + } + + rm := &ResourceLevelMonitor{ + Scope: scope, + CollectorTimer: r.stopwatches[scope], + Levels: r.gauges[scope], + sharedInformer: si, + gvk: gvk, + } + r.monitors[gvk] = rm + + return rm +} + +// This is a global variable to this file. At runtime, the NewResourceMonitorIndex() function should only be called once +// but can be called multiple times in unit tests. +var index *ResourceMonitorIndex + +func NewResourceMonitorIndex() *ResourceMonitorIndex { + if index == nil { + index = &ResourceMonitorIndex{ + lock: &sync.Mutex{}, + monitors: make(map[schema.GroupVersionKind]*ResourceLevelMonitor), + gauges: make(map[promutils.Scope]*labeled.Gauge), + stopwatches: make(map[promutils.Scope]*labeled.StopWatch), + } + } + return index +} diff --git a/pkg/controller/nodes/task/k8s/plugin_collector_test.go b/pkg/controller/nodes/task/k8s/plugin_collector_test.go new file mode 100644 index 000000000..2e8312f0e --- /dev/null +++ b/pkg/controller/nodes/task/k8s/plugin_collector_test.go @@ -0,0 +1,110 @@ +package k8s + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/lyft/flytestdlib/promutils" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" +) + +var pods = []interface{}{ + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "a", + Namespace: "ns-a", + }, + }, + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "b", + Namespace: "ns-a", + }, + }, + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "c", + Namespace: "ns-b", + }, + }, +} + +func TestNewResourceLevelMonitor(t *testing.T) { + x := v1.Pod{} + x.GetObjectMeta() + lm := ResourceLevelMonitor{} + res := lm.countList(context.Background(), pods) + assert.Equal(t, 2, res["ns-a"]) + assert.Equal(t, 1, res["ns-b"]) +} + +type MyFakeInformer struct { + cache.SharedIndexInformer + store cache.Store +} + +func (m MyFakeInformer) GetStore() cache.Store { + return m.store +} + +func (m MyFakeInformer) HasSynced() bool { + return true +} + +type MyFakeStore struct { + cache.Store +} + +func (m MyFakeStore) List() []interface{} { + return pods +} + +func TestResourceLevelMonitor_collect(t *testing.T) { + ctx := context.Background() + scope := promutils.NewScope("testscope") + + kinds, _, err := scheme.Scheme.ObjectKinds(&v1.Pod{}) + assert.NoError(t, err) + myInformer := MyFakeInformer{ + store: MyFakeStore{}, + } + + index := NewResourceMonitorIndex() + rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0]) + rm.collect(ctx) + + var expected = ` + # HELP testscope:k8s_resources Current levels of K8s objects as seen from their informer caches + # TYPE testscope:k8s_resources gauge + testscope:k8s_resources{kind="",ns="ns-a",project=""} 2 + testscope:k8s_resources{kind="",ns="ns-b",project=""} 1 + ` + + err = testutil.CollectAndCompare(rm.Levels.GaugeVec, strings.NewReader(expected)) + assert.NoError(t, err) +} + +func TestResourceLevelMonitorSingletonness(t *testing.T) { + ctx := context.Background() + scope := promutils.NewScope("testscope") + + kinds, _, err := scheme.Scheme.ObjectKinds(&v1.Pod{}) + assert.NoError(t, err) + myInformer := MyFakeInformer{ + store: MyFakeStore{}, + } + + index := NewResourceMonitorIndex() + rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0]) + fmt.Println(rm) + //rm2 := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0]) + + //assert.Equal(t, rm, rm2) +} diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 42f6556a2..ac2760dc2 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -3,9 +3,15 @@ package k8s import ( "context" "fmt" + "reflect" "strings" "time" + "k8s.io/apimachinery/pkg/runtime/schema" + + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" + "github.com/lyft/flytepropeller/pkg/controller/nodes/task/backoff" v1 "k8s.io/api/core/v1" @@ -101,7 +107,8 @@ type PluginManager struct { kubeClient pluginsCore.KubeClient metrics PluginMetrics // Per namespace-resource - backOffController *backoff.Controller + backOffController *backoff.Controller + resourceLevelMonitor *ResourceLevelMonitor } func (e *PluginManager) GetProperties() pluginsCore.PluginProperties { @@ -367,8 +374,10 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu return nil } -func NewPluginManagerWithBackOff(ctx context.Context, iCtx pluginsCore.SetupContext, entry k8s.PluginEntry, backOffController *backoff.Controller) (*PluginManager, error) { - mgr, err := NewPluginManager(ctx, iCtx, entry) +func NewPluginManagerWithBackOff(ctx context.Context, iCtx pluginsCore.SetupContext, entry k8s.PluginEntry, backOffController *backoff.Controller, + monitorIndex *ResourceMonitorIndex) (*PluginManager, error) { + + mgr, err := NewPluginManager(ctx, iCtx, entry, monitorIndex) if err == nil { mgr.backOffController = backOffController } @@ -376,7 +385,7 @@ func NewPluginManagerWithBackOff(ctx context.Context, iCtx pluginsCore.SetupCont } // Creates a K8s generic task executor. This provides an easier way to build task executors that create K8s resources. -func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry k8s.PluginEntry) (*PluginManager, error) { +func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry k8s.PluginEntry, monitorIndex *ResourceMonitorIndex) (*PluginManager, error) { if iCtx.EnqueueOwner() == nil { return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize plugin, enqueue Owner cannot be nil or empty.") } @@ -466,11 +475,47 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry return nil, err } + // Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on + gvk, err := getPluginGvk(entry.ResourceToWatch) + if err != nil { + return nil, err + } + sharedInformer, err := getPluginSharedInformer(iCtx, entry.ResourceToWatch) + if err != nil { + return nil, err + } + rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, sharedInformer, gvk) + // Start the poller and gauge emitter + rm.RunCollectorOnce(ctx) + return &PluginManager{ - id: entry.ID, - plugin: entry.Plugin, - resourceToWatch: entry.ResourceToWatch, - metrics: newPluginMetrics(metricsScope), - kubeClient: iCtx.KubeClient(), + id: entry.ID, + plugin: entry.Plugin, + resourceToWatch: entry.ResourceToWatch, + metrics: newPluginMetrics(metricsScope), + kubeClient: iCtx.KubeClient(), + resourceLevelMonitor: rm, }, nil } + +func getPluginGvk(resourceToWatch runtime.Object) (schema.GroupVersionKind, error) { + kinds, _, err := scheme.Scheme.ObjectKinds(resourceToWatch) + if err != nil && len(kinds) == 0 { + return schema.GroupVersionKind{}, errors.Errorf(errors.PluginInitializationFailed, "No kind in schema for %v", resourceToWatch) + } + return kinds[0], nil +} + +func getPluginSharedInformer(iCtx pluginsCore.SetupContext, resourceToWatch runtime.Object) (cache.SharedIndexInformer, error) { + i, err := iCtx.KubeClient().GetCache().GetInformer(resourceToWatch) + if err != nil { + return nil, errors.Wrapf(errors.PluginInitializationFailed, err, "Error getting informer for %s", reflect.TypeOf(i)) + } + + si, casted := i.(cache.SharedIndexInformer) + if !casted { + return nil, errors.Errorf(errors.PluginInitializationFailed, "wrong type. Actual: %v", reflect.TypeOf(i)) + } + + return si, nil +} diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 6835e53e1..c31bd5c57 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -98,7 +98,7 @@ func ExampleNewPluginManager() { RegisteredTaskTypes: []pluginsCore.TaskType{"container"}, ResourceToWatch: &v1.Pod{}, Plugin: k8sSampleHandler{}, - }) + }, NewResourceMonitorIndex()) if err == nil { fmt.Printf("Created executor: %v\n", exec.GetID()) } else { @@ -192,7 +192,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }) + }, NewResourceMonitorIndex()) assert.NoError(t, err) transition, err := pluginManager.Handle(ctx, tctx) @@ -220,7 +220,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }) + }, NewResourceMonitorIndex()) assert.NoError(t, err) createdPod := &v1.Pod{} @@ -254,7 +254,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }) + }, NewResourceMonitorIndex()) assert.NoError(t, err) createdPod := &v1.Pod{} @@ -287,7 +287,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }) + }, NewResourceMonitorIndex()) assert.NoError(t, err) createdPod := &v1.Pod{} @@ -339,7 +339,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, backOffController) + }, backOffController, NewResourceMonitorIndex()) assert.NoError(t, err) transition, err := pluginManager.Handle(ctx, tctx) @@ -381,7 +381,7 @@ func TestPluginManager_Abort(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }) + }, NewResourceMonitorIndex()) assert.NotNil(t, res) assert.NoError(t, err) @@ -401,7 +401,7 @@ func TestPluginManager_Abort(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }) + }, NewResourceMonitorIndex()) assert.NotNil(t, res) assert.NoError(t, err) @@ -522,7 +522,7 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }) + }, NewResourceMonitorIndex()) assert.NotNil(t, res) assert.NoError(t, err) @@ -576,6 +576,24 @@ func TestAddObjectMetadata(t *testing.T) { assert.Equal(t, l, o.GetLabels()) } +func TestResourceManagerConstruction(t *testing.T) { + ctx := context.Background() + sCtx := &pluginsCoreMock.SetupContext{} + fakeKubeClient := mocks.NewFakeKubeClient() + sCtx.On("KubeClient").Return(fakeKubeClient) + + scope := promutils.NewScope("test:plugin_manager") + index := NewResourceMonitorIndex() + gvk, err := getPluginGvk(&v1.Pod{}) + assert.NoError(t, err) + assert.Equal(t, gvk.Kind, "Pod") + si, err := getPluginSharedInformer(sCtx, &v1.Pod{}) + assert.NotNil(t, si) + assert.NoError(t, err) + rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, si, gvk) + assert.NotNil(t, rm) +} + func init() { - labeled.SetMetricKeys(contextutils.NamespaceKey) + labeled.SetMetricKeys(contextutils.ProjectKey) } diff --git a/pkg/controller/nodes/task/plugin_config.go b/pkg/controller/nodes/task/plugin_config.go index f6186ff81..2f1372010 100644 --- a/pkg/controller/nodes/task/plugin_config.go +++ b/pkg/controller/nodes/task/plugin_config.go @@ -40,6 +40,9 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu // Create a single backOffManager for all the plugins backOffController := backoff.NewController(ctx) + // Create a single resource monitor object for all plugins to use + monitorIndex := k8s.NewResourceMonitorIndex() + k8sPlugins := pr.GetK8sPlugins() for i := range k8sPlugins { kpe := k8sPlugins[i] @@ -52,7 +55,7 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu ID: id, RegisteredTaskTypes: kpe.RegisteredTaskTypes, LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (plugin core.Plugin, e error) { - return k8s.NewPluginManagerWithBackOff(ctx, iCtx, kpe, backOffController) + return k8s.NewPluginManagerWithBackOff(ctx, iCtx, kpe, backOffController, monitorIndex) }, IsDefault: kpe.IsDefault, })